You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/03/17 18:24:45 UTC
[1/2] hadoop git commit: YARN-3243. CapacityScheduler should pass
headroom from parent to children to make sure ParentQueue obey its capacity
limits. Contributed by Wangda Tan.
Repository: hadoop
Updated Branches:
refs/heads/trunk a89b087c4 -> 487374b7f
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index a5a2e5f..972cabb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -350,8 +350,8 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
- a.assignContainers(clusterResource, node_0, false,
- new ResourceLimits(clusterResource));
+ a.assignContainers(clusterResource, node_0, new ResourceLimits(
+ clusterResource));
assertEquals(
(int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
@@ -486,7 +486,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -497,7 +497,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -506,7 +506,7 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Can't allocate 3rd due to user-limit
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -516,7 +516,7 @@ public class TestLeafQueue {
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -525,7 +525,7 @@ public class TestLeafQueue {
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
// One more should work, for app_1, due to user-limit-factor
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -536,8 +536,8 @@ public class TestLeafQueue {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
- a.assignContainers(clusterResource, node_0, false,
- new ResourceLimits(clusterResource));
+ a.assignContainers(clusterResource, node_0, new ResourceLimits(
+ clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -652,21 +652,21 @@ public class TestLeafQueue {
// recordFactory)));
// 1 container to user_0
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -718,7 +718,7 @@ public class TestLeafQueue {
assertEquals("There should only be 1 active user!",
1, qb.getActiveUsersManager().getNumActiveUsers());
//get headroom
- qb.assignContainers(clusterResource, node_0, false,
+ qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@@ -738,7 +738,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_2, user_1);
- qb.assignContainers(clusterResource, node_1, false,
+ qb.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@@ -781,9 +781,9 @@ public class TestLeafQueue {
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_1, user_0);
qb.submitApplicationAttempt(app_3, user_1);
- qb.assignContainers(clusterResource, node_0, false,
+ qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
- qb.assignContainers(clusterResource, node_0, false,
+ qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
@@ -802,7 +802,7 @@ public class TestLeafQueue {
app_4.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
u0Priority, recordFactory)));
- qb.assignContainers(clusterResource, node_1, false,
+ qb.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@@ -875,7 +875,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -892,7 +892,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -981,7 +981,7 @@ public class TestLeafQueue {
1, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -992,7 +992,7 @@ public class TestLeafQueue {
// the application is not yet active
// Again one to user_0 since he hasn't exceeded user limit yet
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1009,7 +1009,7 @@ public class TestLeafQueue {
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1023,7 +1023,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@@ -1094,7 +1094,7 @@ public class TestLeafQueue {
*/
// Only 1 container
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1102,7 +1102,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1110,7 +1110,7 @@ public class TestLeafQueue {
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1129,7 +1129,7 @@ public class TestLeafQueue {
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1139,7 +1139,7 @@ public class TestLeafQueue {
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -1150,7 +1150,7 @@ public class TestLeafQueue {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -1162,7 +1162,7 @@ public class TestLeafQueue {
// Now, allocations should goto app_3 since it's under user-limit
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -1171,7 +1171,7 @@ public class TestLeafQueue {
assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
// Now we should assign to app_3 again since user_2 is under user-limit
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -1271,7 +1271,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1282,7 +1282,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1291,7 +1291,7 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now, reservation should kick in for app_1
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1308,7 +1308,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1325,7 +1325,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1393,7 +1393,7 @@ public class TestLeafQueue {
// Start testing...
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1403,7 +1403,7 @@ public class TestLeafQueue {
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Now, reservation should kick in for app_1
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1417,7 +1417,7 @@ public class TestLeafQueue {
// We do not need locality delay here
doReturn(-1).when(a).getNodeLocalityDelay();
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1434,7 +1434,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1503,7 +1503,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1511,14 +1511,14 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1533,7 +1533,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1543,7 +1543,7 @@ public class TestLeafQueue {
assertEquals(1, app_1.getReReservations(priority));
// Re-reserve
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1553,7 +1553,7 @@ public class TestLeafQueue {
assertEquals(2, app_1.getReReservations(priority));
// Try to schedule on node_1 now, should *move* the reservation
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1571,7 +1571,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
- CSAssignment assignment = a.assignContainers(clusterResource, node_0, false,
+ CSAssignment assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1643,7 +1643,7 @@ public class TestLeafQueue {
CSAssignment assignment = null;
// Start with off switch, shouldn't allocate due to delay scheduling
- assignment = a.assignContainers(clusterResource, node_2, false,
+ assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1652,7 +1652,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
- assignment = a.assignContainers(clusterResource, node_2, false,
+ assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1661,7 +1661,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
- assignment = a.assignContainers(clusterResource, node_2, false,
+ assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1671,7 +1671,7 @@ public class TestLeafQueue {
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
- assignment = a.assignContainers(clusterResource, node_2, false,
+ assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1680,7 +1680,7 @@ public class TestLeafQueue {
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
- assignment = a.assignContainers(clusterResource, node_0, false,
+ assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1689,7 +1689,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
- assignment = a.assignContainers(clusterResource, node_1, false,
+ assignment = a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1718,14 +1718,14 @@ public class TestLeafQueue {
doReturn(1).when(a).getNodeLocalityDelay();
// Shouldn't assign RACK_LOCAL yet
- assignment = a.assignContainers(clusterResource, node_3, false,
+ assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
- assignment = a.assignContainers(clusterResource, node_3, false,
+ assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1807,7 +1807,7 @@ public class TestLeafQueue {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
- a.assignContainers(clusterResource, node_2, false,
+ a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1820,7 +1820,7 @@ public class TestLeafQueue {
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
- a.assignContainers(clusterResource, node_2, false,
+ a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1832,7 +1832,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate OFF_SWITCH P1
- a.assignContainers(clusterResource, node_2, false,
+ a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1844,7 +1844,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, DATA_LOCAL for P1
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1856,7 +1856,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, OFF_SWITCH for P2
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1933,7 +1933,7 @@ public class TestLeafQueue {
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
- a.assignContainers(clusterResource, node_0_0, false,
+ a.assignContainers(clusterResource, node_0_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1942,7 +1942,7 @@ public class TestLeafQueue {
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
- a.assignContainers(clusterResource, node_1_0, false,
+ a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1959,7 +1959,7 @@ public class TestLeafQueue {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
- a.assignContainers(clusterResource, node_0_1, false,
+ a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1967,7 +1967,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority));
// NODE_LOCAL - node_1
- a.assignContainers(clusterResource, node_1_0, false,
+ a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2220,7 +2220,7 @@ public class TestLeafQueue {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
- a.assignContainers(clusterResource, node_0_1, false,
+ a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2243,7 +2243,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
- a.assignContainers(clusterResource, node_1_1, false,
+ a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2274,7 +2274,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
- a.assignContainers(clusterResource, node_1_1, false,
+ a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2303,7 +2303,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
- a.assignContainers(clusterResource, node_1_1, false,
+ a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2330,7 +2330,7 @@ public class TestLeafQueue {
// Blacklist: < host_0_0 > <----
// Now, should allocate since RR(rack_1) = relax: true
- a.assignContainers(clusterResource, node_1_1, false,
+ a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2361,7 +2361,7 @@ public class TestLeafQueue {
// host_1_0: 8G
// host_1_1: 7G
- a.assignContainers(clusterResource, node_1_0, false,
+ a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2444,7 +2444,7 @@ public class TestLeafQueue {
recordFactory)));
try {
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 4f89386..7da1c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -156,7 +156,7 @@ public class TestParentQueue {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).when(queue)
- .assignContainers(eq(clusterResource), eq(node), eq(false),
+ .assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
// Mock the node's resource availability
@@ -167,8 +167,7 @@ public class TestParentQueue {
return new CSAssignment(allocatedResource, type);
}
- }).
-when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
+ }).when(queue).assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
}
@@ -232,7 +231,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@@ -240,13 +239,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
- root.assignContainers(clusterResource, node_1, false,
+ root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -254,13 +253,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// since A has 2/6G while B has 2/14G
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -268,13 +267,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// since A has 3/6G while B has 4/14G
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@@ -282,13 +281,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// since A has 3/6G while B has 8/14G
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
- root.assignContainers(clusterResource, node_1, false,
+ root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@@ -405,6 +404,22 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
@Test
public void testMultiLevelQueues() throws Exception {
+ /*
+ * Structure of queue:
+ * Root
+ * ____________
+ * / | \ \
+ * A B C D
+ * / | / | \ \
+ * A1 A2 B1 B2 B3 C1
+ * \
+ * C11
+ * \
+ * C111
+ * \
+ * C1111
+ */
+
// Setup queue configs
setupMultiLevelQueues(csConf);
@@ -449,7 +464,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 0*GB, clusterResource);
@@ -462,7 +477,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
stubQueueAllocation(a, clusterResource, node_1, 0*GB);
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
- root.assignContainers(clusterResource, node_1, false,
+ root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -474,15 +489,15 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -501,17 +516,17 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
- root.assignContainers(clusterResource, node_2, false,
+ root.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -611,7 +626,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@@ -620,13 +635,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
- root.assignContainers(clusterResource, node_1, false,
+ root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -635,13 +650,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// However, since B returns off-switch, A won't get an opportunity
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -680,7 +695,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// Simulate B3 returning a container on node_0
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(b2, 0*GB, clusterResource);
verifyQueueMetrics(b3, 1*GB, clusterResource);
@@ -689,13 +704,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
- root.assignContainers(clusterResource, node_1, false,
+ root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@@ -704,13 +719,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// However, since B3 returns off-switch, B2 won't get an opportunity
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
- root.assignContainers(clusterResource, node_0, false,
+ root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index b3250e5..c5b7587 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -265,7 +265,7 @@ public class TestReservations {
// Start testing...
// Only AM
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -277,7 +277,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -289,7 +289,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -304,7 +304,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -320,7 +320,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// assign reducer to node 2
- a.assignContainers(clusterResource, node_2, false,
+ a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -337,7 +337,7 @@ public class TestReservations {
// node_1 heartbeat and unreserves from node_0 in order to allocate
// on node_1
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
@@ -421,7 +421,7 @@ public class TestReservations {
// Start testing...
// Only AM
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -433,7 +433,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -445,7 +445,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -460,7 +460,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -476,7 +476,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// assign reducer to node 2
- a.assignContainers(clusterResource, node_2, false,
+ a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -493,7 +493,7 @@ public class TestReservations {
// node_1 heartbeat and won't unreserve from node_0, potentially stuck
// if AM doesn't handle
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -569,7 +569,7 @@ public class TestReservations {
// Start testing...
// Only AM
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -580,7 +580,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -591,7 +591,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -605,7 +605,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -620,7 +620,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// could allocate but told need to unreserve first
- a.assignContainers(clusterResource, node_1, true,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -652,6 +652,8 @@ public class TestReservations {
String host_1 = "host_1";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
8 * GB);
+
+ Resource clusterResource = Resources.createResource(2 * 8 * GB);
// Setup resource-requests
Priority priorityMap = TestUtils.createMockPriority(5);
@@ -681,23 +683,28 @@ public class TestReservations {
node_0.getNodeID(), "user", rmContext);
// no reserved containers
- NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
+ NodeId unreserveId =
+ app_0.getNodeIdToUnreserve(priorityMap, capability,
+ cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// no reserved containers - reserve then unreserve
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
app_0.unreserve(node_0, priorityMap);
- unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
+ unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
+ cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// no container large enough is reserved
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
- unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
+ unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
+ cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// reserve one that is now large enough
app_0.reserve(node_1, priorityMap, rmContainer, container);
- unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
+ unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
+ cs.getResourceCalculator(), clusterResource);
assertEquals(node_1.getNodeID(), unreserveId);
}
@@ -741,14 +748,14 @@ public class TestReservations {
// nothing reserved
boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
- node_1, app_0, priorityMap, capability);
+ node_1, app_0, priorityMap, capability, capability);
assertFalse(res);
// reserved but scheduler doesn't know about that node.
app_0.reserve(node_1, priorityMap, rmContainer, container);
node_1.reserveResource(app_0, priorityMap, rmContainer);
res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
- priorityMap, capability);
+ priorityMap, capability, capability);
assertFalse(res);
}
@@ -815,7 +822,7 @@ public class TestReservations {
// Start testing...
// Only AM
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -826,7 +833,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -837,7 +844,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -852,14 +859,15 @@ public class TestReservations {
// absoluteMaxCapacity
Resource capability = Resources.createResource(32 * GB, 0);
boolean res =
- a.canAssignToThisQueue(clusterResource, capability,
- CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
+ a.canAssignToThisQueue(clusterResource,
+ CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+ clusterResource), capability, Resources.none());
assertFalse(res);
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -872,14 +880,17 @@ public class TestReservations {
capability = Resources.createResource(5 * GB, 0);
res =
- a.canAssignToThisQueue(clusterResource, capability,
- CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
+ a.canAssignToThisQueue(clusterResource,
+ CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+ clusterResource), capability, Resources
+ .createResource(5 * GB));
assertTrue(res);
// tell to not check reservations
res =
- a.canAssignToThisQueue(clusterResource, capability,
- CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
+ a.canAssignToThisQueue(clusterResource,
+ CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+ clusterResource), capability, Resources.none());
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
@@ -887,13 +898,16 @@ public class TestReservations {
// should return false no matter what checkReservations is passed
// in since feature is off
res =
- a.canAssignToThisQueue(clusterResource, capability,
- CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
+ a.canAssignToThisQueue(clusterResource,
+ CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+ clusterResource), capability, Resources.none());
assertFalse(res);
res =
- a.canAssignToThisQueue(clusterResource, capability,
- CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
+ a.canAssignToThisQueue(clusterResource,
+ CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+ clusterResource), capability, Resources
+ .createResource(5 * GB));
assertFalse(res);
}
@@ -985,15 +999,15 @@ public class TestReservations {
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
priorityAM, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
- .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
- priorityReduce, recordFactory)));
- app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
priorityMap, recordFactory)));
+ app_0.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
+ priorityReduce, recordFactory)));
// Start testing...
// Only AM
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1004,7 +1018,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1015,7 +1029,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1029,7 +1043,7 @@ public class TestReservations {
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1117,18 +1131,18 @@ public class TestReservations {
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
priorityAM, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
- .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
- priorityReduce, recordFactory)));
- app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
priorityMap, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
+ priorityReduce, recordFactory)));
+ app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
priorityLast, recordFactory)));
// Start testing...
// Only AM
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1140,7 +1154,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1152,7 +1166,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
- a.assignContainers(clusterResource, node_1, false,
+ a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1164,38 +1178,41 @@ public class TestReservations {
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
- // try to assign reducer (5G on node 0), but tell it
- // it has to unreserve. No room to allocate and shouldn't reserve
- // since nothing currently reserved.
- a.assignContainers(clusterResource, node_0, true,
- new ResourceLimits(clusterResource));
+ // try to assign reducer (5G on node 0), but tell it's resource limits <
+ // used (8G) + required (5G). It will not reserved since it has to unreserve
+ // some resource. Even with continous reservation looking, we don't allow
+ // unreserve resource to reserve container.
+ a.assignContainers(clusterResource, node_0,
+ new ResourceLimits(Resources.createResource(10 * GB)));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
assertEquals(16 * GB, a.getMetrics().getAvailableMB());
- assertEquals(16 * GB, app_0.getHeadroom().getMemory());
+ // app_0's headroom = limit (10G) - used (8G) = 2G
+ assertEquals(2 * GB, app_0.getHeadroom().getMemory());
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
- // try to assign reducer (5G on node 2), but tell it
- // it has to unreserve. Has room but shouldn't reserve
- // since nothing currently reserved.
- a.assignContainers(clusterResource, node_2, true,
- new ResourceLimits(clusterResource));
+ // try to assign reducer (5G on node 0), but tell it's resource limits <
+ // used (8G) + required (5G). It will not reserved since it has to unreserve
+ // some resource. Unfortunately, there's nothing to unreserve.
+ a.assignContainers(clusterResource, node_2,
+ new ResourceLimits(Resources.createResource(10 * GB)));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
assertEquals(16 * GB, a.getMetrics().getAvailableMB());
- assertEquals(16 * GB, app_0.getHeadroom().getMemory());
+ // app_0's headroom = limit (10G) - used (8G) = 2G
+ assertEquals(2 * GB, app_0.getHeadroom().getMemory());
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// let it assign 5G to node_2
- a.assignContainers(clusterResource, node_2, false,
+ a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1208,7 +1225,7 @@ public class TestReservations {
assertEquals(5 * GB, node_2.getUsedResource().getMemory());
// reserve 8G node_0
- a.assignContainers(clusterResource, node_0, false,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1223,7 +1240,7 @@ public class TestReservations {
// try to assign (8G on node 2). No room to allocate,
// continued to try due to having reservation above,
// but hits queue limits so can't reserve anymore.
- a.assignContainers(clusterResource, node_2, false,
+ a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
[2/2] hadoop git commit: YARN-3243. CapacityScheduler should pass
headroom from parent to children to make sure ParentQueue obey its capacity
limits. Contributed by Wangda Tan.
Posted by ji...@apache.org.
YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/487374b7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/487374b7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/487374b7
Branch: refs/heads/trunk
Commit: 487374b7fe0c92fc7eb1406c568952722b5d5b15
Parents: a89b087
Author: Jian He <ji...@apache.org>
Authored: Tue Mar 17 10:22:15 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue Mar 17 10:24:23 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/capacity/AbstractCSQueue.java | 112 ++++++-
.../scheduler/capacity/CSQueue.java | 4 +-
.../scheduler/capacity/CapacityScheduler.java | 33 ++-
.../scheduler/capacity/LeafQueue.java | 292 +++++++------------
.../scheduler/capacity/ParentQueue.java | 140 +++------
.../scheduler/common/fica/FiCaSchedulerApp.java | 16 +-
.../capacity/TestApplicationLimits.java | 8 +-
.../capacity/TestCapacityScheduler.java | 59 ++++
.../scheduler/capacity/TestChildQueueOrder.java | 25 +-
.../scheduler/capacity/TestLeafQueue.java | 142 ++++-----
.../scheduler/capacity/TestParentQueue.java | 97 +++---
.../scheduler/capacity/TestReservations.java | 147 +++++-----
13 files changed, 561 insertions(+), 517 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 82934ad..f5b72d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -56,6 +56,9 @@ Release 2.8.0 - UNRELEASED
IMPROVEMENTS
+ YARN-3243. CapacityScheduler should pass headroom from parent to children
+ to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index d800709..4e53060 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
@@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
+ private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
CSQueue parent;
final String queueName;
@@ -406,21 +411,102 @@ public abstract class AbstractCSQueue implements CSQueue {
parentQ.getPreemptionDisabled());
}
- protected Resource getCurrentResourceLimit(Resource clusterResource,
- ResourceLimits currentResourceLimits) {
+ private Resource getCurrentLimitResource(String nodeLabel,
+ Resource clusterResource, ResourceLimits currentResourceLimits) {
/*
- * Queue's max available resource = min(my.max, my.limit)
- * my.limit is set by my parent, considered used resource of my siblings
+ * Current limit resource: For labeled resource: limit = queue-max-resource
+ * (TODO, this part need update when we support labeled-limit) For
+ * non-labeled resource: limit = min(queue-max-resource,
+ * limit-set-by-parent)
*/
Resource queueMaxResource =
- Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
- queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
- Resource queueCurrentResourceLimit =
- Resources.min(resourceCalculator, clusterResource, queueMaxResource,
- currentResourceLimits.getLimit());
- queueCurrentResourceLimit =
- Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
- minimumAllocation);
- return queueCurrentResourceLimit;
+ Resources.multiplyAndNormalizeDown(resourceCalculator,
+ labelManager.getResourceByLabel(nodeLabel, clusterResource),
+ queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
+ if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
+ return Resources.min(resourceCalculator, clusterResource,
+ queueMaxResource, currentResourceLimits.getLimit());
+ }
+ return queueMaxResource;
+ }
+
+ synchronized boolean canAssignToThisQueue(Resource clusterResource,
+ Set<String> nodeLabels, ResourceLimits currentResourceLimits,
+ Resource nowRequired, Resource resourceCouldBeUnreserved) {
+ // Get label of this queue can access, it's (nodeLabel AND queueLabel)
+ Set<String> labelCanAccess;
+ if (null == nodeLabels || nodeLabels.isEmpty()) {
+ labelCanAccess = new HashSet<String>();
+ // Any queue can always access any node without label
+ labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+ } else {
+ labelCanAccess = new HashSet<String>(
+ accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
+ : Sets.intersection(accessibleLabels, nodeLabels));
+ }
+
+ for (String label : labelCanAccess) {
+ // New total resource = used + required
+ Resource newTotalResource =
+ Resources.add(queueUsage.getUsed(label), nowRequired);
+
+ Resource currentLimitResource =
+ getCurrentLimitResource(label, clusterResource, currentResourceLimits);
+
+ // if reservation continous looking enabled, check to see if could we
+ // potentially use this node instead of a reserved node if the application
+ // has reserved containers.
+ // TODO, now only consider reservation cases when the node has no label
+ if (this.reservationsContinueLooking
+ && label.equals(RMNodeLabelsManager.NO_LABEL)
+ && Resources.greaterThan(resourceCalculator, clusterResource,
+ resourceCouldBeUnreserved, Resources.none())) {
+ // resource-without-reserved = used - reserved
+ Resource newTotalWithoutReservedResource =
+ Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+
+ // when total-used-without-reserved-resource < currentLimit, we still
+ // have chance to allocate on this node by unreserving some containers
+ if (Resources.lessThan(resourceCalculator, clusterResource,
+ newTotalWithoutReservedResource, currentLimitResource)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("try to use reserved: " + getQueueName()
+ + " usedResources: " + queueUsage.getUsed()
+ + ", clusterResources: " + clusterResource
+ + ", reservedResources: " + resourceCouldBeUnreserved
+ + ", capacity-without-reserved: "
+ + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ + currentLimitResource);
+ }
+ return true;
+ }
+ }
+
+ // Otherwise, if any of the label of this node beyond queue limit, we
+ // cannot allocate on this node. Consider a small epsilon here.
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
+ newTotalResource, currentLimitResource)) {
+ return false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getQueueName()
+ + "Check assign to queue, label=" + label
+ + " usedResources: " + queueUsage.getUsed(label)
+ + " clusterResources: " + clusterResource
+ + " currentUsedCapacity "
+ + Resources.divide(resourceCalculator, clusterResource,
+ queueUsage.getUsed(label),
+ labelManager.getResourceByLabel(label, clusterResource))
+ + " max-capacity: "
+ + queueCapacities.getAbsoluteMaximumCapacity(label)
+ + ")");
+ }
+ return true;
+ }
+
+ // Actually, this will not happen, since labelCanAccess will be always
+ // non-empty
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 0a60acc..1a9448a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -189,13 +189,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
* @param node node on which resources are available
- * @param needToUnreserve assign container only if it can unreserve one first
* @param resourceLimits how much overall resource of this queue can use.
* @return the assignment
*/
public CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, boolean needToUnreserve,
- ResourceLimits resourceLimits);
+ FiCaSchedulerNode node, ResourceLimits resourceLimits);
/**
* A container assigned to the queue has completed.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 756e537..c86c0ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1061,9 +1061,14 @@ public class CapacityScheduler extends
node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
- CSAssignment assignment = queue.assignContainers(clusterResource, node,
- false, new ResourceLimits(
- clusterResource));
+ CSAssignment assignment =
+ queue.assignContainers(
+ clusterResource,
+ node,
+ // TODO, now we only consider limits for parent for non-labeled
+ // resources, should consider labeled resources as well.
+ new ResourceLimits(labelManager.getResourceByLabel(
+ RMNodeLabelsManager.NO_LABEL, clusterResource)));
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
@@ -1087,8 +1092,13 @@ public class CapacityScheduler extends
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
- root.assignContainers(clusterResource, node, false, new ResourceLimits(
- clusterResource));
+ root.assignContainers(
+ clusterResource,
+ node,
+ // TODO, now we only consider limits for parent for non-labeled
+ // resources, should consider labeled resources as well.
+ new ResourceLimits(labelManager.getResourceByLabel(
+ RMNodeLabelsManager.NO_LABEL, clusterResource)));
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
@@ -1209,6 +1219,13 @@ public class CapacityScheduler extends
usePortForNodeName, nodeManager.getNodeLabels());
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.activateNode(nodeManager.getNodeID(),
+ nodeManager.getTotalCapability());
+ }
+
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = numNodeManagers.incrementAndGet();
@@ -1220,12 +1237,6 @@ public class CapacityScheduler extends
if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule();
}
-
- // update this node to node label manager
- if (labelManager != null) {
- labelManager.activateNode(nodeManager.getNodeID(),
- nodeManager.getTotalCapability());
- }
}
private synchronized void removeNode(RMNode nodeInfo) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a607a62..dd6a894 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
@Private
@Unstable
@@ -157,7 +156,7 @@ public class LeafQueue extends AbstractCSQueue {
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
- computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+ setQueueResourceLimitsInfo(clusterResource);
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
userLimit = conf.getUserLimit(getQueuePath());
@@ -739,9 +738,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, boolean needToUnreserve,
- ResourceLimits currentResourceLimits) {
- this.currentResourceLimits = currentResourceLimits;
+ FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
+ updateCurrentResourceLimits(currentResourceLimits, clusterResource);
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
@@ -796,7 +794,7 @@ public class LeafQueue extends AbstractCSQueue {
continue;
}
if (!this.reservationsContinueLooking) {
- if (!needContainers(application, priority, required)) {
+ if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
@@ -818,8 +816,8 @@ public class LeafQueue extends AbstractCSQueue {
required, requestedNodeLabels);
// Check queue max-capacity limit
- if (!canAssignToThisQueue(clusterResource, required,
- node.getLabels(), application, true)) {
+ if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
+ this.currentResourceLimits, required, application.getCurrentReservation())) {
return NULL_ASSIGNMENT;
}
@@ -835,7 +833,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to schedule
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
- null, needToUnreserve);
+ null);
// Did the application skip this node?
if (assignment.getSkipped()) {
@@ -896,7 +894,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority,
- rmContainer, false);
+ rmContainer);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
@@ -938,102 +936,14 @@ public class LeafQueue extends AbstractCSQueue {
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
return headroom;
}
-
- synchronized boolean canAssignToThisQueue(Resource clusterResource,
- Resource required, Set<String> nodeLabels, FiCaSchedulerApp application,
- boolean checkReservations) {
- // Get label of this queue can access, it's (nodeLabel AND queueLabel)
- Set<String> labelCanAccess;
- if (null == nodeLabels || nodeLabels.isEmpty()) {
- labelCanAccess = new HashSet<String>();
- // Any queue can always access any node without label
- labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
- } else {
- labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
- }
-
- boolean canAssign = true;
- for (String label : labelCanAccess) {
- Resource potentialTotalCapacity =
- Resources.add(queueUsage.getUsed(label), required);
-
- float potentialNewCapacity =
- Resources.divide(resourceCalculator, clusterResource,
- potentialTotalCapacity,
- labelManager.getResourceByLabel(label, clusterResource));
- // if enabled, check to see if could we potentially use this node instead
- // of a reserved node if the application has reserved containers
- // TODO, now only consider reservation cases when the node has no label
- if (this.reservationsContinueLooking && checkReservations
- && label.equals(RMNodeLabelsManager.NO_LABEL)) {
- float potentialNewWithoutReservedCapacity = Resources.divide(
- resourceCalculator,
- clusterResource,
- Resources.subtract(potentialTotalCapacity,
- application.getCurrentReservation()),
- labelManager.getResourceByLabel(label, clusterResource));
-
- if (potentialNewWithoutReservedCapacity <= queueCapacities
- .getAbsoluteMaximumCapacity()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("try to use reserved: "
- + getQueueName()
- + " usedResources: "
- + queueUsage.getUsed()
- + " clusterResources: "
- + clusterResource
- + " reservedResources: "
- + application.getCurrentReservation()
- + " currentCapacity "
- + Resources.divide(resourceCalculator, clusterResource,
- queueUsage.getUsed(), clusterResource) + " required " + required
- + " potentialNewWithoutReservedCapacity: "
- + potentialNewWithoutReservedCapacity + " ( "
- + " max-capacity: "
- + queueCapacities.getAbsoluteMaximumCapacity() + ")");
- }
- // we could potentially use this node instead of reserved node
- return true;
- }
- }
-
- // Otherwise, if any of the label of this node beyond queue limit, we
- // cannot allocate on this node. Consider a small epsilon here.
- if (potentialNewCapacity > queueCapacities
- .getAbsoluteMaximumCapacity(label) + 1e-4) {
- canAssign = false;
- break;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(getQueueName()
- + "Check assign to queue, label=" + label
- + " usedResources: " + queueUsage.getUsed(label)
- + " clusterResources: " + clusterResource
- + " currentCapacity "
- + Resources.divide(resourceCalculator, clusterResource,
- queueUsage.getUsed(label),
- labelManager.getResourceByLabel(label, clusterResource))
- + " potentialNewCapacity: " + potentialNewCapacity + " ( "
- + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
- + ")");
- }
- }
-
- return canAssign;
- }
- private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
+ private void setQueueResourceLimitsInfo(
Resource clusterResource) {
- Resource queueCurrentResourceLimit =
- getCurrentResourceLimit(clusterResource, currentResourceLimits);
-
synchronized (queueResourceLimitsInfo) {
- queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
+ queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
+ .getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
-
- return queueCurrentResourceLimit;
}
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
@@ -1048,16 +958,16 @@ public class LeafQueue extends AbstractCSQueue {
computeUserLimit(application, clusterResource, required,
queueUser, requestedLabels);
- Resource currentResourceLimit =
- computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+ setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
- getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
+ getHeadroom(queueUser, currentResourceLimits.getLimit(),
+ clusterResource, userLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
- " queueMaxAvailRes=" + currentResourceLimit +
+ " queueMaxAvailRes=" + currentResourceLimits.getLimit() +
" consumed=" + queueUser.getUsed() +
" headroom=" + headroom);
}
@@ -1207,8 +1117,8 @@ public class LeafQueue extends AbstractCSQueue {
return true;
}
- boolean needContainers(FiCaSchedulerApp application, Priority priority,
- Resource required) {
+ boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
+ Priority priority, Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
@@ -1240,7 +1150,7 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, boolean needToUnreserve) {
+ RMContainer reservedContainer) {
Resource assigned = Resources.none();
NodeType requestType = null;
@@ -1252,7 +1162,7 @@ public class LeafQueue extends AbstractCSQueue {
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
- node, application, priority, reservedContainer, needToUnreserve,
+ node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
@@ -1280,7 +1190,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
- node, application, priority, reservedContainer, needToUnreserve,
+ node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
@@ -1308,7 +1218,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
- node, application, priority, reservedContainer, needToUnreserve,
+ node, application, priority, reservedContainer,
allocatedContainer);
// update locality statistics
@@ -1320,13 +1230,24 @@ public class LeafQueue extends AbstractCSQueue {
return SKIP_ASSIGNMENT;
}
+
+ private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
+ // First we need to get minimum resource we need unreserve
+ // minimum-resource-need-unreserve = used + asked - limit
+ Resource minimumUnreservedResource =
+ Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
+ currentResourceLimits.getLimit());
+ return minimumUnreservedResource;
+ }
@Private
protected boolean findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- Resource capability) {
+ Resource askedResource, Resource minimumUnreservedResource) {
// need to unreserve some other container first
- NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
+ NodeId idToUnreserve =
+ application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
+ resourceCalculator, clusterResource);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "
@@ -1343,7 +1264,7 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("unreserving for app: " + application.getApplicationId()
+ " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: "
- + node.getNodeID() + " needing: " + capability);
+ + node.getNodeID() + " needing: " + askedResource);
}
// headroom
@@ -1364,15 +1285,7 @@ public class LeafQueue extends AbstractCSQueue {
@Private
protected boolean checkLimitsToReserve(Resource clusterResource,
- FiCaSchedulerApp application, Resource capability,
- boolean needToUnreserve) {
- if (needToUnreserve) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("we needed to unreserve to be able to allocate");
- }
- return false;
- }
-
+ FiCaSchedulerApp application, Resource capability) {
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
@@ -1380,7 +1293,8 @@ public class LeafQueue extends AbstractCSQueue {
// Check queue max-capacity limit,
// TODO: Consider reservation on labels
- if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
+ if (!canAssignToThisQueue(clusterResource, null,
+ this.currentResourceLimits, capability, Resources.none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
@@ -1402,43 +1316,40 @@ public class LeafQueue extends AbstractCSQueue {
private Resource assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, boolean needToUnreserve,
- MutableObject allocatedContainer) {
+ RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
- needToUnreserve, allocatedContainer);
+ allocatedContainer);
}
return Resources.none();
}
- private Resource assignRackLocalContainers(
- Resource clusterResource, ResourceRequest rackLocalResourceRequest,
- FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, boolean needToUnreserve,
- MutableObject allocatedContainer) {
+ private Resource assignRackLocalContainers(Resource clusterResource,
+ ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority,
+ RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
- needToUnreserve, allocatedContainer);
+ allocatedContainer);
}
return Resources.none();
}
- private Resource assignOffSwitchContainers(
- Resource clusterResource, ResourceRequest offSwitchResourceRequest,
- FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, boolean needToUnreserve,
- MutableObject allocatedContainer) {
+ private Resource assignOffSwitchContainers(Resource clusterResource,
+ ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority,
+ RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
- needToUnreserve, allocatedContainer);
+ allocatedContainer);
}
return Resources.none();
@@ -1522,13 +1433,12 @@ public class LeafQueue extends AbstractCSQueue {
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
- boolean needToUnreserve, MutableObject createdContainer) {
+ MutableObject createdContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
+ " priority=" + priority.getPriority()
- + " request=" + request + " type=" + type
- + " needToUnreserve= " + needToUnreserve);
+ + " request=" + request + " type=" + type);
}
// check if the resource request can access the label
@@ -1548,12 +1458,14 @@ public class LeafQueue extends AbstractCSQueue {
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
- if (!Resources.fitsIn(capability, totalResource)) {
+ if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
+ capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return Resources.none();
}
+
assert Resources.greaterThan(
resourceCalculator, clusterResource, available, Resources.none());
@@ -1566,18 +1478,9 @@ public class LeafQueue extends AbstractCSQueue {
LOG.warn("Couldn't get container for allocation!");
return Resources.none();
}
-
- // default to true since if reservation continue look feature isn't on
- // needContainers is checked earlier and we wouldn't have gotten this far
- boolean canAllocContainer = true;
- if (this.reservationsContinueLooking) {
- // based on reservations can we allocate/reserve more or do we need
- // to unreserve one first
- canAllocContainer = needContainers(application, priority, capability);
- if (LOG.isDebugEnabled()) {
- LOG.debug("can alloc container is: " + canAllocContainer);
- }
- }
+
+ boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
+ application, priority, capability);
// Can we allocate a container on this node?
int availableContainers =
@@ -1588,25 +1491,25 @@ public class LeafQueue extends AbstractCSQueue {
// Did we previously reserve containers at this 'priority'?
if (rmContainer != null) {
unreserve(application, priority, node, rmContainer);
- } else if (this.reservationsContinueLooking
- && (!canAllocContainer || needToUnreserve)) {
- // need to unreserve some other container first
- boolean res = findNodeToUnreserve(clusterResource, node, application,
- priority, capability);
- if (!res) {
- return Resources.none();
- }
- } else {
- // we got here by possibly ignoring queue capacity limits. If the
- // parameter needToUnreserve is true it means we ignored one of those
- // limits in the chance we could unreserve. If we are here we aren't
- // trying to unreserve so we can't allocate anymore due to that parent
- // limit.
- if (needToUnreserve) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("we needed to unreserve to be able to allocate, skipping");
+ } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
+ // when reservationsContinueLooking is set, we may need to unreserve
+ // some containers to meet this queue and its parents' resource limits
+ // TODO, need change here when we want to support continuous reservation
+ // looking for labeled partitions.
+ Resource minimumUnreservedResource =
+ getMinimumResourceNeedUnreserved(capability);
+ if (!shouldAllocOrReserveNewContainer
+ || Resources.greaterThan(resourceCalculator, clusterResource,
+ minimumUnreservedResource, Resources.none())) {
+ boolean containerUnreserved =
+ findNodeToUnreserve(clusterResource, node, application, priority,
+ capability, minimumUnreservedResource);
+ // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
+ // container (That means we *have to* unreserve some resource to
+ // continue)). If we failed to unreserve some resource,
+ if (!containerUnreserved) {
+ return Resources.none();
}
- return Resources.none();
}
}
@@ -1632,17 +1535,16 @@ public class LeafQueue extends AbstractCSQueue {
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
- if ((canAllocContainer) || (rmContainer != null)) {
-
- if (reservationsContinueLooking) {
- // we got here by possibly ignoring parent queue capacity limits. If
- // the parameter needToUnreserve is true it means we ignored one of
- // those limits in the chance we could unreserve. If we are here
- // we aren't trying to unreserve so we can't allocate
- // anymore due to that parent limit
- boolean res = checkLimitsToReserve(clusterResource, application, capability,
- needToUnreserve);
- if (!res) {
+ if (shouldAllocOrReserveNewContainer || rmContainer != null) {
+
+ if (reservationsContinueLooking && rmContainer == null) {
+ // we could possibly ignoring parent queue capacity limits when
+ // reservationsContinueLooking is set.
+ // If we're trying to reserve a container here, not container will be
+ // unreserved for reserving the new one. Check limits again before
+ // reserve the new container
+ if (!checkLimitsToReserve(clusterResource,
+ application, capability)) {
return Resources.none();
}
}
@@ -1784,18 +1686,36 @@ public class LeafQueue extends AbstractCSQueue {
Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
+
+ private void updateCurrentResourceLimits(
+ ResourceLimits currentResourceLimits, Resource clusterResource) {
+ // TODO: need consider non-empty node labels when resource limits supports
+ // node labels
+ // Even if ParentQueue will set limits respect child's max queue capacity,
+ // but when allocating reserved container, CapacityScheduler doesn't do
+ // this. So need cap limits by queue's max capacity here.
+ this.currentResourceLimits = currentResourceLimits;
+ Resource queueMaxResource =
+ Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+ .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+ queueCapacities
+ .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
+ minimumAllocation);
+ this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
+ clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
+ }
@Override
public synchronized void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
- this.currentResourceLimits = currentResourceLimits;
+ updateCurrentResourceLimits(currentResourceLimits, clusterResource);
lastClusterResource = clusterResource;
updateAbsoluteCapacityResource(clusterResource);
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
- computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+ setQueueResourceLimitsInfo(clusterResource);
// Update metrics
CSQueueUtils.updateQueueStatistics(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 7feaa15..5ed6bb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
-import com.google.common.collect.Sets;
-
@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
@@ -380,8 +376,7 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, boolean needToUnreserve,
- ResourceLimits resourceLimits) {
+ FiCaSchedulerNode node, ResourceLimits resourceLimits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
Set<String> nodeLabels = node.getLabels();
@@ -397,21 +392,18 @@ public class ParentQueue extends AbstractCSQueue {
+ getQueueName());
}
- boolean localNeedToUnreserve = false;
-
// Are we over maximum-capacity for this queue?
- if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
- // check to see if we could if we unreserve first
- localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
- if (!localNeedToUnreserve) {
- break;
- }
+ // This will also consider parent's limits and also continuous reservation
+ // looking
+ if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
+ minimumAllocation, Resources.createResource(getMetrics()
+ .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+ break;
}
// Schedule
CSAssignment assignedToChild =
- assignContainersToChildQueues(clusterResource, node,
- localNeedToUnreserve | needToUnreserve, resourceLimits);
+ assignContainersToChildQueues(clusterResource, node, resourceLimits);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
@@ -459,74 +451,6 @@ public class ParentQueue extends AbstractCSQueue {
return assignment;
}
- private synchronized boolean canAssignToThisQueue(Resource clusterResource,
- Set<String> nodeLabels) {
- Set<String> labelCanAccess =
- new HashSet<String>(
- accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
- : Sets.intersection(accessibleLabels, nodeLabels));
- if (nodeLabels.isEmpty()) {
- // Any queue can always access any node without label
- labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
- }
-
- boolean canAssign = true;
- for (String label : labelCanAccess) {
- float currentAbsoluteLabelUsedCapacity =
- Resources.divide(resourceCalculator, clusterResource,
- queueUsage.getUsed(label),
- labelManager.getResourceByLabel(label, clusterResource));
- // if any of the label doesn't beyond limit, we can allocate on this node
- if (currentAbsoluteLabelUsedCapacity >=
- queueCapacities.getAbsoluteMaximumCapacity(label)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
- + " current-capacity (" + queueUsage.getUsed(label) + ") "
- + " >= max-capacity ("
- + labelManager.getResourceByLabel(label, clusterResource) + ")");
- }
- canAssign = false;
- break;
- }
- }
-
- return canAssign;
- }
-
-
- private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
- if (this.reservationsContinueLooking) {
- // check to see if we could potentially use this node instead of a reserved
- // node
-
- Resource reservedResources = Resources.createResource(getMetrics()
- .getReservedMB(), getMetrics().getReservedVirtualCores());
- float capacityWithoutReservedCapacity = Resources.divide(
- resourceCalculator, clusterResource,
- Resources.subtract(queueUsage.getUsed(), reservedResources),
- clusterResource);
-
- if (capacityWithoutReservedCapacity <= queueCapacities
- .getAbsoluteMaximumCapacity()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("parent: try to use reserved: " + getQueueName()
- + " usedResources: " + queueUsage.getUsed().getMemory()
- + " clusterResources: " + clusterResource.getMemory()
- + " reservedResources: " + reservedResources.getMemory()
- + " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
- / clusterResource.getMemory()
- + " potentialNewWithoutReservedCapacity: "
- + capacityWithoutReservedCapacity + " ( " + " max-capacity: "
- + queueCapacities.getAbsoluteMaximumCapacity() + ")");
- }
- // we could potentially use this node instead of reserved node
- return true;
- }
- }
- return false;
- }
-
-
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
@@ -534,28 +458,38 @@ public class ParentQueue extends AbstractCSQueue {
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
- Resource clusterResource, ResourceLimits myLimits) {
- /*
- * Set head-room of a given child, limit =
- * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
- * + child.used. To avoid any of this queue's and its ancestors' limit
- * being violated
- */
- Resource myCurrentLimit =
- getCurrentResourceLimit(clusterResource, myLimits);
- // My available resource = my-current-limit - my-used-resource
- Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
- getUsedResources());
- // Child's limit = my-available-resource + resource-already-used-by-child
+ Resource clusterResource, ResourceLimits parentLimits) {
+ // Set resource-limit of a given child, child.limit =
+ // min(my.limit - my.used + child.used, child.max)
+
+ // Parent available resource = parent-limit - parent-used-resource
+ Resource parentMaxAvailableResource =
+ Resources.subtract(parentLimits.getLimit(), getUsedResources());
+
+ // Child's limit = parent-available-resource + child-used
Resource childLimit =
- Resources.add(myMaxAvailableResource, child.getUsedResources());
-
+ Resources.add(parentMaxAvailableResource, child.getUsedResources());
+
+ // Get child's max resource
+ Resource childConfiguredMaxResource =
+ Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+ .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+ child.getAbsoluteMaximumCapacity(), minimumAllocation);
+
+ // Child's limit should be capped by child configured max resource
+ childLimit =
+ Resources.min(resourceCalculator, clusterResource, childLimit,
+ childConfiguredMaxResource);
+
+ // Normalize before return
+ childLimit =
+ Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
+
return new ResourceLimits(childLimit);
}
private synchronized CSAssignment assignContainersToChildQueues(
- Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
- ResourceLimits limits) {
+ Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@@ -573,9 +507,7 @@ public class ParentQueue extends AbstractCSQueue {
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits);
- assignment =
- childQueue.assignContainers(cluster, node, needToUnreserve,
- childLimits);
+ assignment = childQueue.assignContainers(cluster, node, childLimits);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9f97b13..6cc2777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -274,7 +274,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
synchronized public NodeId getNodeIdToUnreserve(Priority priority,
- Resource capability) {
+ Resource resourceNeedUnreserve, ResourceCalculator rc,
+ Resource clusterResource) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
@@ -283,16 +284,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
+ NodeId nodeId = entry.getKey();
+ Resource containerResource = entry.getValue().getContainer().getResource();
+
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
- if (Resources.fitsIn(capability, entry.getValue().getContainer()
- .getResource())) {
+ if (Resources.lessThanOrEqual(rc, clusterResource,
+ resourceNeedUnreserve, containerResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
- + entry.getValue().getContainer().getResource()
- + " in order to allocate container with size: " + capability);
+ + containerResource
+ + " in order to allocate container with size: " + resourceNeedUnreserve);
}
- return entry.getKey();
+ return nodeId;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 8cad057..1ca5c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -611,7 +611,7 @@ public class TestApplicationLimits {
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
- queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -631,7 +631,7 @@ public class TestApplicationLimits {
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
- queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@@ -651,7 +651,7 @@ public class TestApplicationLimits {
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
- queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -660,7 +660,7 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
- queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 83ab104..7a265dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.ComparisonFailure;
import org.junit.Test;
import org.mockito.Mockito;
@@ -2483,6 +2484,64 @@ public class TestCapacityScheduler {
Assert.assertEquals(30 * GB,
am1.doHeartbeat().getAvailableResources().getMemory());
}
+
+ @Test
+ public void testParentQueueMaxCapsAreRespected() throws Exception {
+ /*
+ * Queue tree:
+ * Root
+ * / \
+ * A B
+ * / \
+ * A1 A2
+ */
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+ csConf.setCapacity(A, 50);
+ csConf.setMaximumCapacity(A, 50);
+ csConf.setCapacity(B, 50);
+
+ // Define 2nd-level queues
+ csConf.setQueues(A, new String[] {"a1", "a2"});
+ csConf.setCapacity(A1, 50);
+ csConf.setUserLimitFactor(A1, 100.0f);
+ csConf.setCapacity(A2, 50);
+ csConf.setUserLimitFactor(A2, 100.0f);
+ csConf.setCapacity(B1, B1_CAPACITY);
+ csConf.setUserLimitFactor(B1, 100.0f);
+
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
+
+ // Try to launch app2 in a2, asked 2GB, should success
+ RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+ try {
+ // Try to allocate a container, a's usage=11G/max=12
+ // a1's usage=9G/max=12
+ // a2's usage=2G/max=12
+ // In this case, if a2 asked 2G, should fail.
+ waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
+ } catch (AssertionError failure) {
+ // Expected, return;
+ return;
+ }
+ Assert.fail("Shouldn't successfully allocate containers for am2, "
+ + "queue-a's max capacity will be violated if container allocated");
+ }
private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 7edb17d..71dc523 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@@ -145,7 +144,7 @@ public class TestChildQueueOrder {
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue)
- .assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+ .assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
// Mock the node's resource availability
@@ -157,7 +156,7 @@ public class TestChildQueueOrder {
return new CSAssignment(allocatedResource, type);
}
}).
- when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+ when(queue).assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
doNothing().when(node).releaseContainer(any(Container.class));
}
@@ -274,7 +273,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
for(int i=0; i < 2; i++)
{
@@ -282,7 +281,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
for(int i=0; i < 3; i++)
@@ -291,7 +290,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
for(int i=0; i < 4; i++)
@@ -300,7 +299,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
verifyQueueMetrics(a, 1*GB, clusterResource);
@@ -334,7 +333,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
verifyQueueMetrics(a, 3*GB, clusterResource);
@@ -362,7 +361,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 3*GB, clusterResource);
@@ -389,7 +388,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -404,13 +403,13 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
InOrder allocationOrder = inOrder(d,b);
allocationOrder.verify(d).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+ any(FiCaSchedulerNode.class), any(ResourceLimits.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+ any(FiCaSchedulerNode.class), any(ResourceLimits.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);