You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/18 00:36:18 UTC
[50/50] [abbrv] hadoop git commit: YARN-2696. Queue sorting in
CapacityScheduler should consider node label. Contributed by Wangda Tan
YARN-2696. Queue sorting in CapacityScheduler should consider node label. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4459349c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4459349c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4459349c
Branch: refs/heads/YARN-2928
Commit: 4459349c8905062d137e86ad6fcad108d335fa76
Parents: 2353676
Author: Jian He <ji...@apache.org>
Authored: Fri Apr 17 13:36:46 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Fri Apr 17 15:31:30 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../dev-support/findbugs-exclude.xml | 6 +-
.../nodelabels/RMNodeLabelsManager.java | 2 +-
.../scheduler/AbstractYarnScheduler.java | 4 +
.../scheduler/ResourceUsage.java | 10 +
.../scheduler/capacity/AbstractCSQueue.java | 12 +-
.../scheduler/capacity/CSQueueUtils.java | 129 ++++--
.../scheduler/capacity/CapacityScheduler.java | 15 +-
.../capacity/CapacitySchedulerContext.java | 4 +-
.../scheduler/capacity/LeafQueue.java | 5 +-
.../scheduler/capacity/ParentQueue.java | 46 +-
.../capacity/PartitionedQueueComparator.java | 68 +++
.../scheduler/capacity/QueueCapacities.java | 11 +-
.../scheduler/capacity/ReservationQueue.java | 7 +-
.../capacity/TestApplicationLimits.java | 12 +-
.../scheduler/capacity/TestChildQueueOrder.java | 4 +-
.../scheduler/capacity/TestLeafQueue.java | 4 +-
.../TestNodeLabelContainerAllocation.java | 451 +++++++++++++++++++
.../scheduler/capacity/TestParentQueue.java | 4 +-
.../scheduler/capacity/TestReservations.java | 4 +-
.../scheduler/fifo/TestFifoScheduler.java | 4 +
21 files changed, 722 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6749b4e..415cf9c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -198,6 +198,9 @@ Release 2.8.0 - UNRELEASED
YARN-3404. Display queue name on application page. (Ryu Kobayashi via jianhe)
+ YARN-2696. Queue sorting in CapacityScheduler should consider node label.
+ (Wangda Tan via jianhe)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 4b01a4d..8aae152 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -141,10 +141,14 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.resource.Priority$Comparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
- <Match>
+ <Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" />
+ <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+ </Match>
<!-- Ignore some irrelevant class name warning -->
<Match>
<Class name="org.apache.hadoop.yarn.api.records.SerializedException" />
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
index 574e24c..25e5bc09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
@@ -254,7 +254,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
}
- public void updateNodeResource(NodeId node, Resource newResource) throws IOException {
+ public void updateNodeResource(NodeId node, Resource newResource) {
deactivateNode(node);
activateNode(node, newResource);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 6699b05..1a8c653 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -548,6 +548,10 @@ public abstract class AbstractYarnScheduler
Resource newResource = resourceOption.getResource();
Resource oldResource = node.getTotalResource();
if(!oldResource.equals(newResource)) {
+ // Notify NodeLabelsManager about this change
+ rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
+ newResource);
+
// Log resource change
LOG.info("Update resource on node: " + node.getNodeName()
+ " from: " + oldResource + ", to: "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 2f7e19d..88e93c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -372,4 +373,13 @@ public class ResourceUsage {
readLock.unlock();
}
}
+
+ public Set<String> getNodePartitionsSet() {
+ try {
+ readLock.lock();
+ return usages.keySet();
+ } finally {
+ readLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index d95c45c..550c6aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -271,8 +271,8 @@ public abstract class AbstractCSQueue implements CSQueue {
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
// Update metrics
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, parent, clusterResource, minimumAllocation);
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+ minimumAllocation, this, labelManager, null);
// Check if labels of this queue is a subset of parent queue, only do this
// when we not root
@@ -351,16 +351,16 @@ public abstract class AbstractCSQueue implements CSQueue {
queueUsage.incUsed(nodePartition, resource);
++numContainers;
- CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
- clusterResource, minimumAllocation);
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+ minimumAllocation, this, labelManager, nodePartition);
}
protected synchronized void releaseResource(Resource clusterResource,
Resource resource, String nodePartition) {
queueUsage.decUsed(nodePartition, resource);
- CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
- clusterResource, minimumAllocation);
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+ minimumAllocation, this, labelManager, nodePartition);
--numContainers;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 1921195..8f9362e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -20,18 +20,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.HashSet;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.collect.Sets;
+
class CSQueueUtils {
-
- private static final Log LOG = LogFactory.getLog(CSQueueUtils.class);
final static float EPSILON = 0.0001f;
@@ -188,41 +187,103 @@ class CSQueueUtils {
}
}
- @Lock(CSQueue.class)
- public static void updateQueueStatistics(
- final ResourceCalculator calculator,
- final CSQueue childQueue, final CSQueue parentQueue,
- final Resource clusterResource, final Resource minimumAllocation) {
- Resource queueLimit = Resources.none();
- Resource usedResources = childQueue.getUsedResources();
-
+ /**
+ * Update partitioned resource usage, if nodePartition == null, will update
+ * used resource for all partitions of this queue.
+ */
+ private static void updateUsedCapacity(final ResourceCalculator rc,
+ final Resource totalPartitionResource, final Resource minimumAllocation,
+ ResourceUsage queueResourceUsage, QueueCapacities queueCapacities,
+ String nodePartition) {
float absoluteUsedCapacity = 0.0f;
float usedCapacity = 0.0f;
- if (Resources.greaterThan(
- calculator, clusterResource, clusterResource, Resources.none())) {
- queueLimit =
- Resources.multiply(clusterResource, childQueue.getAbsoluteCapacity());
- absoluteUsedCapacity =
- Resources.divide(calculator, clusterResource,
- usedResources, clusterResource);
- usedCapacity =
- Resources.equals(queueLimit, Resources.none()) ? 0 :
- Resources.divide(calculator, clusterResource,
- usedResources, queueLimit);
+ if (Resources.greaterThan(rc, totalPartitionResource,
+ totalPartitionResource, Resources.none())) {
+ // queueGuaranteed = totalPartitionedResource *
+ // absolute_capacity(partition)
+ Resource queueGuranteedResource =
+ Resources.multiply(totalPartitionResource,
+ queueCapacities.getAbsoluteCapacity(nodePartition));
+
+ // make queueGuranteed >= minimum_allocation to avoid divided by 0.
+ queueGuranteedResource =
+ Resources.max(rc, totalPartitionResource, queueGuranteedResource,
+ minimumAllocation);
+
+ Resource usedResource = queueResourceUsage.getUsed(nodePartition);
+ absoluteUsedCapacity =
+ Resources.divide(rc, totalPartitionResource, usedResource,
+ totalPartitionResource);
+ usedCapacity =
+ Resources.divide(rc, totalPartitionResource, usedResource,
+ queueGuranteedResource);
+ }
+
+ queueCapacities
+ .setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity);
+ queueCapacities.setUsedCapacity(nodePartition, usedCapacity);
+ }
+
+ private static Resource getNonPartitionedMaxAvailableResourceToQueue(
+ final ResourceCalculator rc, Resource totalNonPartitionedResource,
+ CSQueue queue) {
+ Resource queueLimit = Resources.none();
+ Resource usedResources = queue.getUsedResources();
+
+ if (Resources.greaterThan(rc, totalNonPartitionedResource,
+ totalNonPartitionedResource, Resources.none())) {
+ queueLimit =
+ Resources.multiply(totalNonPartitionedResource,
+ queue.getAbsoluteCapacity());
}
- childQueue.setUsedCapacity(usedCapacity);
- childQueue.setAbsoluteUsedCapacity(absoluteUsedCapacity);
-
Resource available = Resources.subtract(queueLimit, usedResources);
- childQueue.getMetrics().setAvailableResourcesToQueue(
- Resources.max(
- calculator,
- clusterResource,
- available,
- Resources.none()
- )
- );
+ return Resources.max(rc, totalNonPartitionedResource, available,
+ Resources.none());
+ }
+
+ /**
+ * <p>
+ * Update Queue Statistics:
+ * </p>
+ *
+ * <li>used-capacity/absolute-used-capacity by partition</li>
+ * <li>non-partitioned max-avail-resource to queue</li>
+ *
+ * <p>
+ * When nodePartition is null, all partition of
+ * used-capacity/absolute-used-capacity will be updated.
+ * </p>
+ */
+ @Lock(CSQueue.class)
+ public static void updateQueueStatistics(
+ final ResourceCalculator rc, final Resource cluster, final Resource minimumAllocation,
+ final CSQueue childQueue, final RMNodeLabelsManager nlm,
+ final String nodePartition) {
+ QueueCapacities queueCapacities = childQueue.getQueueCapacities();
+ ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
+
+ if (nodePartition == null) {
+ for (String partition : Sets.union(
+ queueCapacities.getNodePartitionsSet(),
+ queueResourceUsage.getNodePartitionsSet())) {
+ updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
+ minimumAllocation, queueResourceUsage, queueCapacities, partition);
+ }
+ } else {
+ updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
+ minimumAllocation, queueResourceUsage, queueCapacities, nodePartition);
+ }
+
+ // Now in QueueMetrics, we only store available-resource-to-queue for
+ // default partition.
+ if (nodePartition == null
+ || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ childQueue.getMetrics().setAvailableResourcesToQueue(
+ getNonPartitionedMaxAvailableResourceToQueue(rc,
+ nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster),
+ childQueue));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cfeee37..5d58b15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -136,7 +136,8 @@ public class CapacityScheduler extends
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
- static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
+ static final Comparator<CSQueue> nonPartitionedQueueComparator =
+ new Comparator<CSQueue>() {
@Override
public int compare(CSQueue q1, CSQueue q2) {
if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
@@ -148,6 +149,9 @@ public class CapacityScheduler extends
return q1.getQueuePath().compareTo(q2.getQueuePath());
}
};
+
+ static final PartitionedQueueComparator partitionedQueueComparator =
+ new PartitionedQueueComparator();
static final Comparator<FiCaSchedulerApp> applicationComparator =
new Comparator<FiCaSchedulerApp>() {
@@ -274,8 +278,13 @@ public class CapacityScheduler extends
}
@Override
- public Comparator<CSQueue> getQueueComparator() {
- return queueComparator;
+ public Comparator<CSQueue> getNonPartitionedQueueComparator() {
+ return nonPartitionedQueueComparator;
+ }
+
+ @Override
+ public PartitionedQueueComparator getPartitionedQueueComparator() {
+ return partitionedQueueComparator;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 28dc988..707c463 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -58,7 +58,9 @@ public interface CapacitySchedulerContext {
ResourceCalculator getResourceCalculator();
- Comparator<CSQueue> getQueueComparator();
+ Comparator<CSQueue> getNonPartitionedQueueComparator();
+
+ PartitionedQueueComparator getPartitionedQueueComparator();
FiCaSchedulerNode getNode(NodeId nodeId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 8a6a601..f860574 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1814,9 +1814,8 @@ public class LeafQueue extends AbstractCSQueue {
setQueueResourceLimitsInfo(clusterResource);
// Update metrics
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, getParent(), clusterResource,
- minimumAllocation);
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+ minimumAllocation, this, labelManager, null);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index eb64d43..53142b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
@@ -68,7 +69,8 @@ public class ParentQueue extends AbstractCSQueue {
protected final Set<CSQueue> childQueues;
private final boolean rootQueue;
- final Comparator<CSQueue> queueComparator;
+ final Comparator<CSQueue> nonPartitionedQueueComparator;
+ final PartitionedQueueComparator partitionQueueComparator;
volatile int numApplications;
private final CapacitySchedulerContext scheduler;
@@ -79,7 +81,8 @@ public class ParentQueue extends AbstractCSQueue {
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
- this.queueComparator = cs.getQueueComparator();
+ this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator();
+ this.partitionQueueComparator = cs.getPartitionedQueueComparator();
this.rootQueue = (parent == null);
@@ -92,7 +95,7 @@ public class ParentQueue extends AbstractCSQueue {
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
- this.childQueues = new TreeSet<CSQueue>(queueComparator);
+ this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
setupQueueConfigs(cs.getClusterResource());
@@ -522,6 +525,17 @@ public class ParentQueue extends AbstractCSQueue {
return new ResourceLimits(childLimit);
}
+ private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) {
+ if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+ return childQueues.iterator();
+ }
+
+ partitionQueueComparator.setPartitionToLookAt(node.getPartition());
+ List<CSQueue> childrenList = new ArrayList<>(childQueues);
+ Collections.sort(childrenList, partitionQueueComparator);
+ return childrenList.iterator();
+ }
+
private synchronized CSAssignment assignContainersToChildQueues(
Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
SchedulingMode schedulingMode) {
@@ -531,7 +545,8 @@ public class ParentQueue extends AbstractCSQueue {
printChildQueues();
// Try to assign to most 'under-served' sub-queue
- for (Iterator<CSQueue> iter = childQueues.iterator(); iter.hasNext();) {
+ for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(node); iter
+ .hasNext();) {
CSQueue childQueue = iter.next();
if(LOG.isDebugEnabled()) {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
@@ -554,13 +569,17 @@ public class ParentQueue extends AbstractCSQueue {
if (Resources.greaterThan(
resourceCalculator, cluster,
assignment.getResource(), Resources.none())) {
- // Remove and re-insert to sort
- iter.remove();
- LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() +
- " stats: " + childQueue);
- childQueues.add(childQueue);
- if (LOG.isDebugEnabled()) {
- printChildQueues();
+ // Only update childQueues when we doing non-partitioned node
+ // allocation.
+ if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) {
+ // Remove and re-insert to sort
+ iter.remove();
+ LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath()
+ + " stats: " + childQueue);
+ childQueues.add(childQueue);
+ if (LOG.isDebugEnabled()) {
+ printChildQueues();
+ }
}
break;
}
@@ -647,9 +666,8 @@ public class ParentQueue extends AbstractCSQueue {
childQueue.updateClusterResource(clusterResource, childLimits);
}
- // Update metrics
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, parent, clusterResource, minimumAllocation);
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+ minimumAllocation, this, labelManager, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java
new file mode 100644
index 0000000..ddcc719
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java
@@ -0,0 +1,68 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.Comparator;
+
+public class PartitionedQueueComparator implements Comparator<CSQueue> {
+ private String partitionToLookAt = null;
+
+ public void setPartitionToLookAt(String partitionToLookAt) {
+ this.partitionToLookAt = partitionToLookAt;
+ }
+
+
+ @Override
+ public int compare(CSQueue q1, CSQueue q2) {
+ /*
+ * 1. Check accessible to given partition, if one queue accessible and
+ * the other not, accessible queue goes first.
+ */
+ boolean q1Accessible =
+ q1.getAccessibleNodeLabels().contains(partitionToLookAt);
+ boolean q2Accessible =
+ q2.getAccessibleNodeLabels().contains(partitionToLookAt);
+ if (q1Accessible && !q2Accessible) {
+ return -1;
+ } else if (!q1Accessible && q2Accessible) {
+ return 1;
+ }
+
+ /*
+ *
+ * 2. When two queue has same accessibility, check who will go first:
+ * Now we simply compare their used resource on the partition to lookAt
+ */
+ float used1 = q1.getQueueCapacities().getUsedCapacity(partitionToLookAt);
+ float used2 = q2.getQueueCapacities().getUsedCapacity(partitionToLookAt);
+ if (Math.abs(used1 - used2) < 1e-6) {
+ // When used capacity is same, compare their guaranteed-capacity
+ float cap1 = q1.getQueueCapacities().getCapacity(partitionToLookAt);
+ float cap2 = q2.getQueueCapacities().getCapacity(partitionToLookAt);
+
+ // when cap1 == cap2, we will compare queue's name
+ if (Math.abs(cap1 - cap2) < 1e-6) {
+ return q1.getQueueName().compareTo(q2.getQueueName());
+ }
+ return Float.compare(cap2, cap1);
+ }
+
+ return Float.compare(used1, used2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
index 962a636..d0a26d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
@@ -30,8 +30,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import com.google.common.collect.Sets;
-
public class QueueCapacities {
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
private static final float LABEL_DOESNT_EXIST_CAP = 0f;
@@ -254,4 +252,13 @@ public class QueueCapacities {
readLock.unlock();
}
}
+
+ public Set<String> getNodePartitionsSet() {
+ try {
+ readLock.lock();
+ return capacitiesMap.keySet();
+ } finally {
+ readLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index a8d17cf..4790cc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -63,10 +63,9 @@ public class ReservationQueue extends LeafQueue {
+ " from " + newlyParsedQueue.getQueuePath());
}
super.reinitialize(newlyParsedQueue, clusterResource);
- CSQueueUtils.updateQueueStatistics(
- parent.schedulerContext.getResourceCalculator(), newlyParsedQueue,
- parent, parent.schedulerContext.getClusterResource(),
- parent.schedulerContext.getMinimumResourceCapability());
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+ minimumAllocation, this, labelManager, null);
+
updateQuotas(parent.getUserLimitForReservation(),
parent.getUserLimitFactor(),
parent.getMaxApplicationsForReservations(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 46167ca..a41fdfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -95,8 +95,8 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
- when(csContext.getQueueComparator()).
- thenReturn(CapacityScheduler.queueComparator);
+ when(csContext.getNonPartitionedQueueComparator()).
+ thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@@ -255,8 +255,8 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(16*GB, 16));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
- when(csContext.getQueueComparator()).
- thenReturn(CapacityScheduler.queueComparator);
+ when(csContext.getNonPartitionedQueueComparator()).
+ thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@@ -554,8 +554,8 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(16*GB));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
- when(csContext.getQueueComparator()).
- thenReturn(CapacityScheduler.queueComparator);
+ when(csContext.getNonPartitionedQueueComparator()).
+ thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 970a98a..2608dcb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -96,8 +96,8 @@ public class TestChildQueueOrder {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
- when(csContext.getQueueComparator()).
- thenReturn(CapacityScheduler.queueComparator);
+ when(csContext.getNonPartitionedQueueComparator()).
+ thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 0b5250b..0a19604 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -152,8 +152,8 @@ public class TestLeafQueue {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
- when(csContext.getQueueComparator()).
- thenReturn(CapacityScheduler.queueComparator);
+ when(csContext.getNonPartitionedQueueComparator()).
+ thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index cf1b26f..5155db5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -1024,4 +1024,455 @@ public class TestNodeLabelContainerAllocation {
rm1.close();
}
+
+ private void checkQueueUsedCapacity(String queueName, CapacityScheduler cs,
+ String nodePartition, float usedCapacity, float absoluteUsedCapacity) {
+ float epsilon = 1e-6f;
+ CSQueue queue = cs.getQueue(queueName);
+ Assert.assertNotNull("Failed to get queue=" + queueName, queue);
+
+ Assert.assertEquals(usedCapacity, queue.getQueueCapacities()
+ .getUsedCapacity(nodePartition), epsilon);
+ Assert.assertEquals(absoluteUsedCapacity, queue.getQueueCapacities()
+ .getAbsoluteUsedCapacity(nodePartition), epsilon);
+ }
+
+ private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nodeId);
+ for (int i = 0; i < nHeartbeat; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ }
+
+ private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
+ throws InterruptedException {
+ int totalWaitTick = 100; // wait 10 sec at most.
+ while (expectedNodeNum > rm.getResourceScheduler().getNumClusterNodes()
+ && totalWaitTick > 0) {
+ Thread.sleep(100);
+ totalWaitTick--;
+ }
+ }
+
+ @Test
+ public void testQueueUsedCapacitiesUpdate()
+ throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * / \ (x)
+ * a1 a2
+ * (x) (x)
+ * </pre>
+ *
+ * Both a/b can access x, we need to verify when
+ * <pre>
+ * 1) container allocated/released in both partitioned/non-partitioned node,
+ * 2) clusterResource updates
+ * 3) queue guaranteed resource changed
+ * </pre>
+ *
+ * used capacity / absolute used capacity of queues are correctly updated.
+ */
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+ "b" });
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ /**
+ * Initially, we set A/B's resource 50:50
+ */
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(A, 50);
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
+ csConf.setCapacityByLabel(A, "x", 50);
+
+ csConf.setQueues(A, new String[] { "a1", "a2" });
+
+ final String A1 = A + ".a1";
+ csConf.setCapacity(A1, 50);
+ csConf.setAccessibleNodeLabels(A1, toSet("x"));
+ csConf.setCapacityByLabel(A1, "x", 50);
+
+ final String A2 = A + ".a2";
+ csConf.setCapacity(A2, 50);
+ csConf.setAccessibleNodeLabels(A2, toSet("x"));
+ csConf.setCapacityByLabel(A2, "x", 50);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(B, 50);
+ csConf.setAccessibleNodeLabels(B, toSet("x"));
+ csConf.setCapacityByLabel(B, "x", 50);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.getRMContext().setNodeLabelManager(mgr);
+ rm.start();
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ /*
+ * Before we adding any node to the cluster, used-capacity/abs-used-capacity
+ * should be 0
+ */
+ checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "", 0f, 0f);
+
+ MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
+ MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+ /*
+ * After we adding nodes to the cluster, and before starting to use them,
+ * used-capacity/abs-used-capacity should be 0
+ */
+ checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "", 0f, 0f);
+
+ // app1 -> a1
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+ // app1 asks for 1 partition= containers
+ am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
+
+ doNMHeartbeat(rm, nm2.getNodeId(), 10);
+
+ // Now check usage, app1 uses:
+ // a1: used(no-label) = 80%
+ // abs-used(no-label) = 20%
+ // a: used(no-label) = 40%
+ // abs-used(no-label) = 20%
+ // root: used(no-label) = 20%
+ // abs-used(no-label) = 20%
+ checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
+ checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
+ checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
+
+ // app1 asks for 2 partition=x containers
+ am1.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "x");
+ doNMHeartbeat(rm, nm1.getNodeId(), 10);
+
+ // Now check usage, app1 uses:
+ // a1: used(x) = 80%
+ // abs-used(x) = 20%
+ // a: used(x) = 40%
+ // abs-used(x) = 20%
+ // root: used(x) = 20%
+ // abs-used(x) = 20%
+ checkQueueUsedCapacity("a", cs, "x", 0.4f, 0.2f);
+ checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
+ checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
+ checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
+ checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "x", 0.2f, 0.2f);
+ checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
+
+ // submit an app to a2, uses 1 NON_PARTITIONED container and 1 PARTITIONED
+ // container
+ // app2 -> a2
+ RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "a2");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ // app1 asks for 1 partition= containers
+ am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
+ doNMHeartbeat(rm, nm1.getNodeId(), 10);
+
+ // Now check usage, app1 uses:
+ // a2: used(x) = 40%
+ // abs-used(x) = 10%
+ // a: used(x) = 20%
+ // abs-used(x) = 10%
+ // root: used(x) = 10%
+ // abs-used(x) = 10%
+ checkQueueUsedCapacity("a", cs, "x", 0.6f, 0.3f);
+ checkQueueUsedCapacity("a", cs, "", 0.6f, 0.3f);
+ checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
+ checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
+ checkQueueUsedCapacity("a2", cs, "x", 0.4f, 0.1f);
+ checkQueueUsedCapacity("a2", cs, "", 0.4f, 0.1f);
+ checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "x", 0.3f, 0.3f);
+ checkQueueUsedCapacity("root", cs, "", 0.3f, 0.3f);
+
+ // Add nm3/nm4, double resource for both partitioned/non-partitioned
+ // resource, used capacity should be 1/2 of before
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h3", 0), toSet("x")));
+ rm.registerNode("h3:1234", 10 * GB); // label = x
+ rm.registerNode("h4:1234", 10 * GB); // label = <empty>
+
+ waitSchedulerNodeJoined(rm, 4);
+
+ checkQueueUsedCapacity("a", cs, "x", 0.3f, 0.15f);
+ checkQueueUsedCapacity("a", cs, "", 0.3f, 0.15f);
+ checkQueueUsedCapacity("a1", cs, "x", 0.4f, 0.1f);
+ checkQueueUsedCapacity("a1", cs, "", 0.4f, 0.1f);
+ checkQueueUsedCapacity("a2", cs, "x", 0.2f, 0.05f);
+ checkQueueUsedCapacity("a2", cs, "", 0.2f, 0.05f);
+ checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
+ checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
+
+ // Reinitialize queue, makes A's capacity double, and B's capacity to be 0
+ csConf.setCapacity(A, 100); // was 50
+ csConf.setCapacityByLabel(A, "x", 100); // was 50
+ csConf.setCapacity(B, 0); // was 50
+ csConf.setCapacityByLabel(B, "x", 0); // was 50
+ cs.reinitialize(csConf, rm.getRMContext());
+
+ checkQueueUsedCapacity("a", cs, "x", 0.15f, 0.15f);
+ checkQueueUsedCapacity("a", cs, "", 0.15f, 0.15f);
+ checkQueueUsedCapacity("a1", cs, "x", 0.2f, 0.1f);
+ checkQueueUsedCapacity("a1", cs, "", 0.2f, 0.1f);
+ checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
+ checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
+ checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
+ checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
+
+ // Release all task containers from a1, check usage
+ am1.allocate(null, Arrays.asList(
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2),
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3),
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 4)));
+ checkQueueUsedCapacity("a", cs, "x", 0.05f, 0.05f);
+ checkQueueUsedCapacity("a", cs, "", 0.10f, 0.10f);
+ checkQueueUsedCapacity("a1", cs, "x", 0.0f, 0.0f);
+ checkQueueUsedCapacity("a1", cs, "", 0.1f, 0.05f);
+ checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
+ checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
+ checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
+ checkQueueUsedCapacity("b", cs, "", 0f, 0f);
+ checkQueueUsedCapacity("root", cs, "x", 0.05f, 0.05f);
+ checkQueueUsedCapacity("root", cs, "", 0.10f, 0.10f);
+
+ rm.close();
+ }
+
+ @Test
+ public void testOrderOfAllocationOnPartitions()
+ throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ * <pre>
+ * root
+ * ________________
+ * / | \ \
+ * a (x) b (x) c d
+ * </pre>
+ *
+ * Both a/b can access x, we need to verify when
+ * <pre>
+ * When doing allocation on partitioned nodes,
+ * - Queue has accessibility to the node will go first
+ * - When accessibility is same
+ * - Queue has less used_capacity on given partition will go first
+ * - When used_capacity is same
+ * - Queue has more abs_capacity will go first
+ * </pre>
+ *
+ * used capacity / absolute used capacity of queues are correctly updated.
+ */
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+ "b", "c", "d" });
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(A, 25);
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
+ csConf.setCapacityByLabel(A, "x", 30);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(B, 25);
+ csConf.setAccessibleNodeLabels(B, toSet("x"));
+ csConf.setCapacityByLabel(B, "x", 70);
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ csConf.setCapacity(C, 25);
+
+ final String D = CapacitySchedulerConfiguration.ROOT + ".d";
+ csConf.setCapacity(D, 25);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.getRMContext().setNodeLabelManager(mgr);
+ rm.start();
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
+ MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+ // app1 -> a
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+ // app2 -> b
+ RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ // app3 -> c
+ RMApp app3 = rm.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm2);
+
+ // app4 -> d
+ RMApp app4 = rm.submitApp(1 * GB, "app", "user", null, "d");
+ MockAM am4 = MockRM.launchAndRegisterAM(app4, rm, nm2);
+
+ // Test case 1
+ // Both a/b has used_capacity(x) = 0, when doing exclusive allocation, b
+ // will go first since b has more capacity(x)
+ am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
+ am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
+ doNMHeartbeat(rm, nm1.getNodeId(), 1);
+ checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+ cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+
+ // Test case 2
+ // Do another allocation, a will go first since it has 0 use_capacity(x) and
+ // b has 1/7 used_capacity(x)
+ am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
+ doNMHeartbeat(rm, nm1.getNodeId(), 1);
+ checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+ cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+
+ // Test case 3
+ // Just like above, when doing non-exclusive allocation, b will go first as well.
+ am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
+ am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
+ doNMHeartbeat(rm, nm1.getNodeId(), 2);
+ checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+ cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+ cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+
+ // Test case 4
+ // After b allocated, we should be able to allocate non-exlusive container in a
+ doNMHeartbeat(rm, nm1.getNodeId(), 2);
+ checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+ cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+ cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+
+ // Test case 5
+ // b/c/d asks non-exclusive container together, b will go first irrelated to
+ // used_capacity(x)
+ am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
+ am3.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
+ am4.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
+ doNMHeartbeat(rm, nm1.getNodeId(), 2);
+ checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+ cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+ cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
+ cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
+ cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+
+ // Test case 6
+ // After b allocated, c will go first by lexicographic order
+ doNMHeartbeat(rm, nm1.getNodeId(), 1);
+ checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+ cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+ cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+ cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
+ cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+
+ // Test case 7
+ // After c allocated, d will go first because it has less used_capacity(x)
+ // than c
+ doNMHeartbeat(rm, nm1.getNodeId(), 2);
+ checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+ cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+ cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+ cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+ cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+
+ // Test case 8
+ // After d allocated, c will go first, c/d has same use_capacity(x), so compare c/d's lexicographic order
+ doNMHeartbeat(rm, nm1.getNodeId(), 1);
+ checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+ cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
+ cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
+ cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+ checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
+ cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 52d0bc1..bdbd168 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -92,8 +92,8 @@ public class TestParentQueue {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
- when(csContext.getQueueComparator()).
- thenReturn(CapacityScheduler.queueComparator);
+ when(csContext.getNonPartitionedQueueComparator()).
+ thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 47be618..fc546ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -122,8 +122,8 @@ public class TestReservations {
Resources.createResource(100 * 16 * GB, 100 * 12));
when(csContext.getApplicationComparator()).thenReturn(
CapacityScheduler.applicationComparator);
- when(csContext.getQueueComparator()).thenReturn(
- CapacityScheduler.queueComparator);
+ when(csContext.getNonPartitionedQueueComparator()).thenReturn(
+ CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4459349c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 77eebdf..e4583d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -301,6 +302,9 @@ public class TestFifoScheduler {
scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
+ NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
+ nlm.init(new Configuration());
+ rmContext.setNodeLabelManager(nlm);
scheduler.setRMContext(rmContext);
scheduler.init(conf);