You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/18 00:35:55 UTC
[27/50] [abbrv] hadoop git commit: YARN-3361. CapacityScheduler side
changes to support non-exclusive node labels. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 882498a..eb64d43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -377,16 +375,29 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, ResourceLimits resourceLimits) {
- CSAssignment assignment =
- new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
- Set<String> nodeLabels = node.getLabels();
-
+ FiCaSchedulerNode node, ResourceLimits resourceLimits,
+ SchedulingMode schedulingMode) {
// if our queue cannot access this node, just return
- if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
- return assignment;
+ if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+ && !accessibleToPartition(node.getPartition())) {
+ return NULL_ASSIGNMENT;
+ }
+
+ // Check if this queue need more resource, simply skip allocation if this
+ // queue doesn't need more resources.
+ if (!super.hasPendingResourceRequest(node.getPartition(),
+ clusterResource, schedulingMode)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip this queue=" + getQueuePath()
+ + ", because it doesn't need more resource, schedulingMode="
+ + schedulingMode.name() + " node-partition=" + node.getPartition());
+ }
+ return NULL_ASSIGNMENT;
}
+ CSAssignment assignment =
+ new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+
while (canAssign(clusterResource, node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign containers to child-queue of "
@@ -396,15 +407,17 @@ public class ParentQueue extends AbstractCSQueue {
// Are we over maximum-capacity for this queue?
// This will also consider parent's limits and also continuous reservation
// looking
- if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
- minimumAllocation, Resources.createResource(getMetrics()
- .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+ if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+ resourceLimits, minimumAllocation, Resources.createResource(
+ getMetrics().getReservedMB(), getMetrics()
+ .getReservedVirtualCores()), schedulingMode)) {
break;
}
// Schedule
- CSAssignment assignedToChild =
- assignContainersToChildQueues(clusterResource, node, resourceLimits);
+ CSAssignment assignedToChild =
+ assignContainersToChildQueues(clusterResource, node, resourceLimits,
+ schedulingMode);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
@@ -413,7 +426,7 @@ public class ParentQueue extends AbstractCSQueue {
assignedToChild.getResource(), Resources.none())) {
// Track resource utilization for the parent-queue
super.allocateResource(clusterResource, assignedToChild.getResource(),
- nodeLabels);
+ node.getPartition());
// Track resource utilization in this pass of the scheduler
Resources
@@ -510,7 +523,8 @@ public class ParentQueue extends AbstractCSQueue {
}
private synchronized CSAssignment assignContainersToChildQueues(
- Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
+ Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
+ SchedulingMode schedulingMode) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@@ -523,12 +537,13 @@ public class ParentQueue extends AbstractCSQueue {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
}
-
+
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits);
- assignment = childQueue.assignContainers(cluster, node, childLimits);
+ assignment = childQueue.assignContainers(cluster, node,
+ childLimits, schedulingMode);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
@@ -584,7 +599,7 @@ public class ParentQueue extends AbstractCSQueue {
// Book keeping
synchronized (this) {
super.releaseResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -653,7 +668,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
@@ -681,7 +696,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -701,7 +716,7 @@ public class ParentQueue extends AbstractCSQueue {
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
- node.getLabels());
+ node.getPartition());
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
new file mode 100644
index 0000000..7e7dc37
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+/**
+ * Scheduling modes, see below for detailed explanations
+ */
+public enum SchedulingMode {
+ /**
+ * <p>
+ * When a node has partition (say partition=x), only application in the queue
+ * can access to partition=x AND requires for partition=x resource can get
+ * chance to allocate on the node.
+ * </p>
+ *
+ * <p>
+ * When a node has no partition, only application requires non-partitioned
+ * resource can get chance to allocate on the node.
+ * </p>
+ */
+ RESPECT_PARTITION_EXCLUSIVITY,
+
+ /**
+ * Only used when a node has partition AND the partition isn't an exclusive
+ * partition AND application requires non-partitioned resource.
+ */
+ IGNORE_PARTITION_EXCLUSIVITY
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index 76ede39..9b7eb84 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -277,6 +278,9 @@ public class Application {
} else {
request.setNumContainers(request.getNumContainers() + 1);
}
+ if (request.getNodeLabelExpression() == null) {
+ request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+ }
// Note this down for next interaction with ResourceManager
ask.remove(request);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index f62fdb3..5c107aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -150,8 +150,14 @@ public class MockAM {
public AllocateResponse allocate(
String host, int memory, int numContainers,
List<ContainerId> releases, String labelExpression) throws Exception {
+ return allocate(host, memory, numContainers, 1, releases, labelExpression);
+ }
+
+ public AllocateResponse allocate(
+ String host, int memory, int numContainers, int priority,
+ List<ContainerId> releases, String labelExpression) throws Exception {
List<ResourceRequest> reqs =
- createReq(new String[] { host }, memory, 1, numContainers,
+ createReq(new String[] { host }, memory, priority, numContainers,
labelExpression);
return allocate(reqs, releases);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 06c6b32..f2b1d86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -200,10 +202,18 @@ public class MockRM extends ResourceManager {
public boolean waitForState(MockNM nm, ContainerId containerId,
RMContainerState containerState, int timeoutMillisecs) throws Exception {
+ return waitForState(Arrays.asList(nm), containerId, containerState,
+ timeoutMillisecs);
+ }
+
+ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
+ RMContainerState containerState, int timeoutMillisecs) throws Exception {
RMContainer container = getResourceScheduler().getRMContainer(containerId);
int timeoutSecs = 0;
while(container == null && timeoutSecs++ < timeoutMillisecs / 100) {
- nm.nodeHeartbeat(true);
+ for (MockNM nm : nms) {
+ nm.nodeHeartbeat(true);
+ }
container = getResourceScheduler().getRMContainer(containerId);
System.out.println("Waiting for container " + containerId + " to be allocated.");
Thread.sleep(100);
@@ -217,9 +227,11 @@ public class MockRM extends ResourceManager {
&& timeoutSecs++ < timeoutMillisecs / 100) {
System.out.println("Container : " + containerId + " State is : "
+ container.getState() + " Waiting for state : " + containerState);
- nm.nodeHeartbeat(true);
+ for (MockNM nm : nms) {
+ nm.nodeHeartbeat(true);
+ }
Thread.sleep(100);
-
+
if (timeoutMillisecs <= timeoutSecs * 100) {
return false;
}
@@ -650,11 +662,28 @@ public class MockRM extends ResourceManager {
am.waitForState(RMAppAttemptState.FINISHED);
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
}
+
+ @SuppressWarnings("rawtypes")
+ private static void waitForSchedulerAppAttemptAdded(
+ ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException {
+ int tick = 0;
+ // Wait for at most 5 sec
+ while (null == ((AbstractYarnScheduler) rm.getResourceScheduler())
+ .getApplicationAttempt(attemptId) && tick < 50) {
+ Thread.sleep(100);
+ if (tick % 10 == 0) {
+ System.out.println("waiting for SchedulerApplicationAttempt="
+ + attemptId + " added.");
+ }
+ tick++;
+ }
+ }
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt();
+ waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
System.out.println("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 1ca5c97..46167ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -612,7 +612,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -632,7 +632,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource)); // Schedule to compute
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@@ -652,7 +652,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource)); // Schedule to compute
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
@@ -661,7 +661,7 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource)); // Schedule to compute
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 23b31fa..970a98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -133,7 +134,7 @@ public class TestChildQueueOrder {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource, null);
+ allocatedResource, RMNodeLabelsManager.NO_LABEL);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
@@ -145,7 +146,7 @@ public class TestChildQueueOrder {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue)
.assignContainers(eq(clusterResource), eq(node),
- any(ResourceLimits.class));
+ any(ResourceLimits.class), any(SchedulingMode.class));
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@@ -157,7 +158,7 @@ public class TestChildQueueOrder {
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node),
- any(ResourceLimits.class));
+ any(ResourceLimits.class), any(SchedulingMode.class));
doNothing().when(node).releaseContainer(any(Container.class));
}
@@ -241,6 +242,14 @@ public class TestChildQueueOrder {
CSQueue b = queues.get(B);
CSQueue c = queues.get(C);
CSQueue d = queues.get(D);
+
+ // Make a/b/c/d has >0 pending resource, so that allocation will continue.
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
+ a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
final String user_0 = "user_0";
@@ -275,7 +284,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
for(int i=0; i < 2; i++)
{
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
@@ -283,7 +292,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
for(int i=0; i < 3; i++)
{
@@ -292,7 +301,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
for(int i=0; i < 4; i++)
{
@@ -301,7 +310,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -335,7 +344,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -363,7 +372,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 3*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -390,7 +399,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -405,12 +414,14 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(d,b);
- allocationOrder.verify(d).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), any(ResourceLimits.class));
- allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), any(ResourceLimits.class));
+ allocationOrder.verify(d).assignContainers(eq(clusterResource),
+ any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+ any(SchedulingMode.class));
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+ any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 03b8f5c..54ba617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -51,9 +54,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
@@ -327,387 +334,4 @@ public class TestContainerAllocation {
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
-
- private Configuration getConfigurationWithQueueLabels(Configuration config) {
- CapacitySchedulerConfiguration conf =
- new CapacitySchedulerConfiguration(config);
-
- // Define top-level queues
- conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
- conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
- conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
-
- final String A = CapacitySchedulerConfiguration.ROOT + ".a";
- conf.setCapacity(A, 10);
- conf.setMaximumCapacity(A, 15);
- conf.setAccessibleNodeLabels(A, toSet("x"));
- conf.setCapacityByLabel(A, "x", 100);
-
- final String B = CapacitySchedulerConfiguration.ROOT + ".b";
- conf.setCapacity(B, 20);
- conf.setAccessibleNodeLabels(B, toSet("y"));
- conf.setCapacityByLabel(B, "y", 100);
-
- final String C = CapacitySchedulerConfiguration.ROOT + ".c";
- conf.setCapacity(C, 70);
- conf.setMaximumCapacity(C, 70);
- conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
-
- // Define 2nd-level queues
- final String A1 = A + ".a1";
- conf.setQueues(A, new String[] {"a1"});
- conf.setCapacity(A1, 100);
- conf.setMaximumCapacity(A1, 100);
- conf.setCapacityByLabel(A1, "x", 100);
-
- final String B1 = B + ".b1";
- conf.setQueues(B, new String[] {"b1"});
- conf.setCapacity(B1, 100);
- conf.setMaximumCapacity(B1, 100);
- conf.setCapacityByLabel(B1, "y", 100);
-
- final String C1 = C + ".c1";
- conf.setQueues(C, new String[] {"c1"});
- conf.setCapacity(C1, 100);
- conf.setMaximumCapacity(C1, 100);
-
- return conf;
- }
-
- private void checkTaskContainersHost(ApplicationAttemptId attemptId,
- ContainerId containerId, ResourceManager rm, String host) {
- YarnScheduler scheduler = rm.getRMContext().getScheduler();
- SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
-
- Assert.assertTrue(appReport.getLiveContainers().size() > 0);
- for (RMContainer c : appReport.getLiveContainers()) {
- if (c.getContainerId().equals(containerId)) {
- Assert.assertEquals(host, c.getAllocatedNode().getHost());
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private <E> Set<E> toSet(E... elements) {
- Set<E> set = Sets.newHashSet(elements);
- return set;
- }
-
- @Test (timeout = 300000)
- public void testContainerAllocationWithSingleUserLimits() throws Exception {
- final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
- mgr.init(conf);
-
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
- NodeId.newInstance("h2", 0), toSet("y")));
-
- // inject node label manager
- MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
- rm1.registerNode("h2:1234", 8000); // label = y
- MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-
- // launch an app to queue a1 (label = x), and check all container will
- // be allocated in h1
- RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- // A has only 10% of x, so it can only allocate one container in label=empty
- ContainerId containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
- Assert.assertTrue(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- // Cannot allocate 2nd label=empty container
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
- Assert.assertFalse(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
-
- // A has default user limit = 100, so it can use all resource in label = x
- // We can allocate floor(8000 / 1024) = 7 containers
- for (int id = 3; id <= 8; id++) {
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
- Assert.assertTrue(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- }
- rm1.close();
- }
-
- @Test(timeout = 300000)
- public void testContainerAllocateWithComplexLabels() throws Exception {
- /*
- * Queue structure:
- * root (*)
- * ________________
- * / \
- * a x(100%), y(50%) b y(50%), z(100%)
- * ________________ ______________
- * / / \
- * a1 (x,y) b1(no) b2(y,z)
- * 100% y = 100%, z = 100%
- *
- * Node structure:
- * h1 : x
- * h2 : y
- * h3 : y
- * h4 : z
- * h5 : NO
- *
- * Total resource:
- * x: 4G
- * y: 6G
- * z: 2G
- * *: 2G
- *
- * Resource of
- * a1: x=4G, y=3G, NO=0.2G
- * b1: NO=0.9G (max=1G)
- * b2: y=3, z=2G, NO=0.9G (max=1G)
- *
- * Each node can only allocate two containers
- */
-
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
- toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
- NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
- toSet("z"), NodeId.newInstance("h5", 0),
- RMNodeLabelsManager.EMPTY_STRING_SET));
-
- // inject node label manager
- MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 2048);
- MockNM nm2 = rm1.registerNode("h2:1234", 2048);
- MockNM nm3 = rm1.registerNode("h3:1234", 2048);
- MockNM nm4 = rm1.registerNode("h4:1234", 2048);
- MockNM nm5 = rm1.registerNode("h5:1234", 2048);
-
- ContainerId containerId;
-
- // launch an app to queue a1 (label = x), and check all container will
- // be allocated in h1
- RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- // request a container (label = y). can be allocated on nm2
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
- Assert.assertTrue(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
- "h2");
-
- // launch an app to queue b1 (label = y), and check all container will
- // be allocated in h5
- RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
-
- // request a container for AM, will succeed
- // and now b1's queue capacity will be used, cannot allocate more containers
- // (Maximum capacity reached)
- am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm4, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertFalse(rm1.waitForState(nm5, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
-
- // launch an app to queue b2
- RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
- MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
-
- // request a container. try to allocate on nm1 (label = x) and nm3 (label =
- // y,z). Will successfully allocate on nm3
- am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
- containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
- "h3");
-
- // try to allocate container (request label = z) on nm4 (label = y,z).
- // Will successfully allocate on nm4 only.
- am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
- containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
- Assert.assertTrue(rm1.waitForState(nm4, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
- "h4");
-
- rm1.close();
- }
-
- @Test (timeout = 120000)
- public void testContainerAllocateWithLabels() throws Exception {
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
- NodeId.newInstance("h2", 0), toSet("y")));
-
- // inject node label manager
- MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
- MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
- MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-
- ContainerId containerId;
-
- // launch an app to queue a1 (label = x), and check all container will
- // be allocated in h1
- RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
-
- // request a container.
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
- "h1");
-
- // launch an app to queue b1 (label = y), and check all container will
- // be allocated in h2
- RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
-
- // request a container.
- am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
- containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
- "h2");
-
- // launch an app to queue c1 (label = ""), and check all container will
- // be allocated in h3
- RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
- MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
-
- // request a container.
- am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
- "h3");
-
- rm1.close();
- }
-
- @Test (timeout = 120000)
- public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
- // This test is pretty much similar to testContainerAllocateWithLabel.
- // Difference is, this test doesn't specify label expression in ResourceRequest,
- // instead, it uses default queue label expression
-
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
- NodeId.newInstance("h2", 0), toSet("y")));
-
- // inject node label manager
- MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
- MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
- MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-
- ContainerId containerId;
-
- // launch an app to queue a1 (label = x), and check all container will
- // be allocated in h1
- RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- // request a container.
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
- "h1");
-
- // launch an app to queue b1 (label = y), and check all container will
- // be allocated in h2
- RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // request a container.
- am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
- "h2");
-
- // launch an app to queue c1 (label = ""), and check all container will
- // be allocated in h3
- RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
- MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
-
- // request a container.
- am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
- "h3");
-
- rm1.close();
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 972cabb..0b5250b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -351,7 +351,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(
(int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
@@ -487,7 +487,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -498,7 +498,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -507,7 +507,7 @@ public class TestLeafQueue {
// Can't allocate 3rd due to user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -517,7 +517,7 @@ public class TestLeafQueue {
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -526,7 +526,7 @@ public class TestLeafQueue {
// One more should work, for app_1, due to user-limit-factor
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -537,7 +537,7 @@ public class TestLeafQueue {
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -653,21 +653,21 @@ public class TestLeafQueue {
// 1 container to user_0
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
@@ -719,10 +719,10 @@ public class TestLeafQueue {
1, qb.getActiveUsersManager().getNumActiveUsers());
//get headroom
qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
//maxqueue 16G, userlimit 13G, - 4G used = 9G
assertEquals(9*GB,app_0.getHeadroom().getMemory());
@@ -739,10 +739,10 @@ public class TestLeafQueue {
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_2, user_1);
qb.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8*GB, qb.getUsedResources().getMemory());
assertEquals(4*GB, app_0.getCurrentConsumption().getMemory());
@@ -782,12 +782,12 @@ public class TestLeafQueue {
qb.submitApplicationAttempt(app_1, user_0);
qb.submitApplicationAttempt(app_3, user_1);
qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, qb.getUsedResources().getMemory());
//maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both)
assertEquals(5*GB, app_3.getHeadroom().getMemory());
@@ -803,13 +803,13 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
u0Priority, recordFactory)));
qb.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
//app3 is user1, active from last test case
@@ -876,7 +876,7 @@ public class TestLeafQueue {
priority, recordFactory)));
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -893,7 +893,7 @@ public class TestLeafQueue {
priority, recordFactory)));
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -982,7 +982,7 @@ public class TestLeafQueue {
// 1 container to user_0
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -993,7 +993,7 @@ public class TestLeafQueue {
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1010,7 +1010,7 @@ public class TestLeafQueue {
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1024,7 +1024,7 @@ public class TestLeafQueue {
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@@ -1095,7 +1095,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1103,7 +1103,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1111,7 +1111,7 @@ public class TestLeafQueue {
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1130,7 +1130,7 @@ public class TestLeafQueue {
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1140,7 +1140,7 @@ public class TestLeafQueue {
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1151,7 +1151,7 @@ public class TestLeafQueue {
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1163,7 +1163,7 @@ public class TestLeafQueue {
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1172,7 +1172,7 @@ public class TestLeafQueue {
// Now we should assign to app_3 again since user_2 is under user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1272,7 +1272,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1283,7 +1283,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1292,7 +1292,7 @@ public class TestLeafQueue {
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1309,7 +1309,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1326,7 +1326,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1394,7 +1394,7 @@ public class TestLeafQueue {
// Start testing...
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1404,7 +1404,7 @@ public class TestLeafQueue {
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1418,7 +1418,7 @@ public class TestLeafQueue {
doReturn(-1).when(a).getNodeLocalityDelay();
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1435,7 +1435,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
@@ -1504,7 +1504,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1512,14 +1512,14 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1534,7 +1534,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1544,7 +1544,7 @@ public class TestLeafQueue {
// Re-reserve
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1554,7 +1554,7 @@ public class TestLeafQueue {
// Try to schedule on node_1 now, should *move* the reservation
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1572,7 +1572,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
CSAssignment assignment = a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1644,7 +1644,7 @@ public class TestLeafQueue {
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
@@ -1653,7 +1653,7 @@ public class TestLeafQueue {
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority));
@@ -1662,7 +1662,7 @@ public class TestLeafQueue {
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority));
@@ -1672,7 +1672,7 @@ public class TestLeafQueue {
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
@@ -1681,7 +1681,7 @@ public class TestLeafQueue {
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1690,7 +1690,7 @@ public class TestLeafQueue {
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1719,14 +1719,14 @@ public class TestLeafQueue {
// Shouldn't assign RACK_LOCAL yet
assignment = a.assignContainers(clusterResource, node_3,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1808,7 +1808,7 @@ public class TestLeafQueue {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
@@ -1821,7 +1821,7 @@ public class TestLeafQueue {
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
@@ -1833,7 +1833,7 @@ public class TestLeafQueue {
// Another off-switch, shouldn't allocate OFF_SWITCH P1
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
@@ -1845,7 +1845,7 @@ public class TestLeafQueue {
// Now, DATA_LOCAL for P1
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1857,7 +1857,7 @@ public class TestLeafQueue {
// Now, OFF_SWITCH for P2
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1934,7 +1934,7 @@ public class TestLeafQueue {
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1943,7 +1943,7 @@ public class TestLeafQueue {
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
a.assignContainers(clusterResource, node_1_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
@@ -1960,7 +1960,7 @@ public class TestLeafQueue {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
a.assignContainers(clusterResource, node_0_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
@@ -1968,7 +1968,7 @@ public class TestLeafQueue {
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -2221,7 +2221,7 @@ public class TestLeafQueue {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
a.assignContainers(clusterResource, node_0_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2244,7 +2244,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
a.assignContainers(clusterResource, node_1_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2275,7 +2275,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
a.assignContainers(clusterResource, node_1_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2304,7 +2304,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
a.assignContainers(clusterResource, node_1_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2331,7 +2331,7 @@ public class TestLeafQueue {
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
@@ -2362,7 +2362,7 @@ public class TestLeafQueue {
// host_1_1: 7G
a.assignContainers(clusterResource, node_1_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
@@ -2445,7 +2445,7 @@ public class TestLeafQueue {
try {
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "
+ "forget to set off-switch request should be handled");