You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/03/17 18:24:45 UTC

[1/2] hadoop git commit: YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.

Repository: hadoop
Updated Branches:
  refs/heads/trunk a89b087c4 -> 487374b7f


http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index a5a2e5f..972cabb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -350,8 +350,8 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false,
-        new ResourceLimits(clusterResource));
+    a.assignContainers(clusterResource, node_0, new ResourceLimits(
+        clusterResource));
     assertEquals(
         (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
         a.getMetrics().getAvailableMB());
@@ -486,7 +486,7 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -497,7 +497,7 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -506,7 +506,7 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getMetrics().getAllocatedMB());
     
     // Can't allocate 3rd due to user-limit
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -516,7 +516,7 @@ public class TestLeafQueue {
     
     // Bump up user-limit-factor, now allocate should work
     a.setUserLimitFactor(10);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -525,7 +525,7 @@ public class TestLeafQueue {
     assertEquals(3*GB, a.getMetrics().getAllocatedMB());
 
     // One more should work, for app_1, due to user-limit-factor
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -536,8 +536,8 @@ public class TestLeafQueue {
     // Test max-capacity
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
-    a.assignContainers(clusterResource, node_0, false,
-        new ResourceLimits(clusterResource));
+    a.assignContainers(clusterResource, node_0, new ResourceLimits(
+        clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -652,21 +652,21 @@ public class TestLeafQueue {
 //            recordFactory)));
 
     // 1 container to user_0
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Again one to user_0 since he hasn't exceeded user limit yet
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
 
     // One more to user_0 since he is the only active user
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -718,7 +718,7 @@ public class TestLeafQueue {
     assertEquals("There should only be 1 active user!",
         1, qb.getActiveUsersManager().getNumActiveUsers());
     //get headroom
-    qb.assignContainers(clusterResource, node_0, false,
+    qb.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@@ -738,7 +738,7 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
             u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_2, user_1);
-    qb.assignContainers(clusterResource, node_1, false,
+    qb.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@@ -781,9 +781,9 @@ public class TestLeafQueue {
              u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_1, user_0);
     qb.submitApplicationAttempt(app_3, user_1);
-    qb.assignContainers(clusterResource, node_0, false,
+    qb.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
-    qb.assignContainers(clusterResource, node_0, false,
+    qb.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
         .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
@@ -802,7 +802,7 @@ public class TestLeafQueue {
     app_4.updateResourceRequests(Collections.singletonList(
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
                       u0Priority, recordFactory)));
-    qb.assignContainers(clusterResource, node_1, false,
+    qb.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@@ -875,7 +875,7 @@ public class TestLeafQueue {
             TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
                 priority, recordFactory)));
 
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -892,7 +892,7 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
             priority, recordFactory)));
 
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -981,7 +981,7 @@ public class TestLeafQueue {
         1, a.getActiveUsersManager().getNumActiveUsers());
 
     // 1 container to user_0
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -992,7 +992,7 @@ public class TestLeafQueue {
       // the application is not yet active
 
     // Again one to user_0 since he hasn't exceeded user limit yet
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1009,7 +1009,7 @@ public class TestLeafQueue {
 
     // No more to user_0 since he is already over user-limit
     // and no more containers to queue since it's already at max-cap
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1023,7 +1023,7 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
             priority, recordFactory)));
     assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(0*GB, app_2.getHeadroom().getMemory());   // hit queue max-cap 
   }
@@ -1094,7 +1094,7 @@ public class TestLeafQueue {
      */
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1102,7 +1102,7 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1110,7 +1110,7 @@ public class TestLeafQueue {
     
     // Can't allocate 3rd due to user-limit
     a.setUserLimit(25);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1129,7 +1129,7 @@ public class TestLeafQueue {
     // Now allocations should goto app_2 since 
     // user_0 is at limit inspite of high user-limit-factor
     a.setUserLimitFactor(10);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1139,7 +1139,7 @@ public class TestLeafQueue {
 
     // Now allocations should goto app_0 since 
     // user_0 is at user-limit not above it
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -1150,7 +1150,7 @@ public class TestLeafQueue {
     // Test max-capacity
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -1162,7 +1162,7 @@ public class TestLeafQueue {
     // Now, allocations should goto app_3 since it's under user-limit 
     a.setMaxCapacity(1.0f);
     a.setUserLimitFactor(1);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(7*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -1171,7 +1171,7 @@ public class TestLeafQueue {
     assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
 
     // Now we should assign to app_3 again since user_2 is under user-limit
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@@ -1271,7 +1271,7 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1282,7 +1282,7 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1291,7 +1291,7 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getMetrics().getAllocatedMB());
     
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1308,7 +1308,7 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1325,7 +1325,7 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1393,7 +1393,7 @@ public class TestLeafQueue {
 
     // Start testing...
 
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1403,7 +1403,7 @@ public class TestLeafQueue {
     assertEquals(0*GB, a.getMetrics().getAvailableMB());
 
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1417,7 +1417,7 @@ public class TestLeafQueue {
     // We do not need locality delay here
     doReturn(-1).when(a).getNodeLocalityDelay();
     
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(10*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1434,7 +1434,7 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1503,7 +1503,7 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1511,14 +1511,14 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
     
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1533,7 +1533,7 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1543,7 +1543,7 @@ public class TestLeafQueue {
     assertEquals(1, app_1.getReReservations(priority));
 
     // Re-reserve
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1553,7 +1553,7 @@ public class TestLeafQueue {
     assertEquals(2, app_1.getReReservations(priority));
     
     // Try to schedule on node_1 now, should *move* the reservation
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(9*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@@ -1571,7 +1571,7 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    CSAssignment assignment = a.assignContainers(clusterResource, node_0, false,
+    CSAssignment assignment = a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1643,7 +1643,7 @@ public class TestLeafQueue {
     CSAssignment assignment = null;
     
     // Start with off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false,
+    assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1652,7 +1652,7 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Another off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false,
+    assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1661,7 +1661,7 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     // Another off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false,
+    assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1671,7 +1671,7 @@ public class TestLeafQueue {
     
     // Another off switch, now we should allocate 
     // since missedOpportunities=3 and reqdContainers=3
-    assignment = a.assignContainers(clusterResource, node_2, false,
+    assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1680,7 +1680,7 @@ public class TestLeafQueue {
     assertEquals(NodeType.OFF_SWITCH, assignment.getType());
     
     // NODE_LOCAL - node_0
-    assignment = a.assignContainers(clusterResource, node_0, false,
+    assignment = a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1689,7 +1689,7 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     // NODE_LOCAL - node_1
-    assignment = a.assignContainers(clusterResource, node_1, false,
+    assignment = a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1718,14 +1718,14 @@ public class TestLeafQueue {
     doReturn(1).when(a).getNodeLocalityDelay();
     
     // Shouldn't assign RACK_LOCAL yet
-    assignment = a.assignContainers(clusterResource, node_3, false,
+    assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(2, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Should assign RACK_LOCAL now
-    assignment = a.assignContainers(clusterResource, node_3, false,
+    assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1807,7 +1807,7 @@ public class TestLeafQueue {
     
     // Start with off switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
-    a.assignContainers(clusterResource, node_2, false,
+    a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1820,7 +1820,7 @@ public class TestLeafQueue {
 
     // Another off-switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
-    a.assignContainers(clusterResource, node_2, false,
+    a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1832,7 +1832,7 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Another off-switch, shouldn't allocate OFF_SWITCH P1
-    a.assignContainers(clusterResource, node_2, false,
+    a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1844,7 +1844,7 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Now, DATA_LOCAL for P1
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1856,7 +1856,7 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Now, OFF_SWITCH for P2
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
@@ -1933,7 +1933,7 @@ public class TestLeafQueue {
     app_0.updateResourceRequests(app_0_requests_0);
     
     // NODE_LOCAL - node_0_1
-    a.assignContainers(clusterResource, node_0_0, false,
+    a.assignContainers(clusterResource, node_0_0,
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1942,7 +1942,7 @@ public class TestLeafQueue {
 
     // No allocation on node_1_0 even though it's node/rack local since
     // required(ANY) == 0
-    a.assignContainers(clusterResource, node_1_0, false,
+    a.assignContainers(clusterResource, node_1_0,
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1959,7 +1959,7 @@ public class TestLeafQueue {
 
     // No allocation on node_0_1 even though it's node/rack local since
     // required(rack_1) == 0
-    a.assignContainers(clusterResource, node_0_1, false,
+    a.assignContainers(clusterResource, node_0_1,
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -1967,7 +1967,7 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority));
     
     // NODE_LOCAL - node_1
-    a.assignContainers(clusterResource, node_1_0, false,
+    a.assignContainers(clusterResource, node_1_0,
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2220,7 +2220,7 @@ public class TestLeafQueue {
     
     // node_0_1  
     // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
-    a.assignContainers(clusterResource, node_0_1, false, 
+    a.assignContainers(clusterResource, node_0_1, 
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2243,7 +2243,7 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since RR(rack_1) = relax: false
-    a.assignContainers(clusterResource, node_1_1, false, 
+    a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2274,7 +2274,7 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since node_1_1 is blacklisted
-    a.assignContainers(clusterResource, node_1_1, false, 
+    a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2303,7 +2303,7 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since rack_1 is blacklisted
-    a.assignContainers(clusterResource, node_1_1, false, 
+    a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2330,7 +2330,7 @@ public class TestLeafQueue {
     // Blacklist: < host_0_0 >       <----
 
     // Now, should allocate since RR(rack_1) = relax: true
-    a.assignContainers(clusterResource, node_1_1, false, 
+    a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource));
     verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2361,7 +2361,7 @@ public class TestLeafQueue {
     // host_1_0: 8G
     // host_1_1: 7G
 
-    a.assignContainers(clusterResource, node_1_0, false, 
+    a.assignContainers(clusterResource, node_1_0, 
         new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
@@ -2444,7 +2444,7 @@ public class TestLeafQueue {
             recordFactory)));
 
     try {
-      a.assignContainers(clusterResource, node_0, false, 
+      a.assignContainers(clusterResource, node_0, 
           new ResourceLimits(clusterResource));
     } catch (NullPointerException e) {
       Assert.fail("NPE when allocating container on node but "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 4f89386..7da1c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -156,7 +156,7 @@ public class TestParentQueue {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).when(queue)
-              .assignContainers(eq(clusterResource), eq(node), eq(false),
+              .assignContainers(eq(clusterResource), eq(node),
                   any(ResourceLimits.class));
 
           // Mock the node's resource availability
@@ -167,8 +167,7 @@ public class TestParentQueue {
 
         return new CSAssignment(allocatedResource, type);
       }
-    }).
-when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
+    }).when(queue).assignContainers(eq(clusterResource), eq(node),
         any(ResourceLimits.class));
   }
   
@@ -232,7 +231,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
@@ -240,13 +239,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
     stubQueueAllocation(a, clusterResource, node_1, 2*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-    root.assignContainers(clusterResource, node_1, false, 
+    root.assignContainers(clusterResource, node_1, 
         new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -254,13 +253,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // since A has 2/6G while B has 2/14G
     stubQueueAllocation(a, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -268,13 +267,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // since A has 3/6G while B has 4/14G
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 4*GB);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
 
@@ -282,13 +281,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // since A has 3/6G while B has 8/14G
     stubQueueAllocation(a, clusterResource, node_1, 1*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-    root.assignContainers(clusterResource, node_1, false, 
+    root.assignContainers(clusterResource, node_1, 
         new ResourceLimits(clusterResource));
     allocationOrder = inOrder(a, b);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 9*GB, clusterResource);
   }
@@ -405,6 +404,22 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
 
   @Test
   public void testMultiLevelQueues() throws Exception {
+    /*
+     * Structure of queue:
+     *            Root
+     *           ____________
+     *          /    |   \   \
+     *         A     B    C   D
+     *       / |   / | \   \
+     *      A1 A2 B1 B2 B3  C1
+     *                        \
+     *                         C11
+     *                           \
+     *                           C111
+     *                             \
+     *                              C1111
+     */
+    
     // Setup queue configs
     setupMultiLevelQueues(csConf);
     
@@ -449,7 +464,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 1*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 0*GB, clusterResource);
@@ -462,7 +477,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     stubQueueAllocation(a, clusterResource, node_1, 0*GB);
     stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
     stubQueueAllocation(c, clusterResource, node_1, 0*GB);
-    root.assignContainers(clusterResource, node_1, false, 
+    root.assignContainers(clusterResource, node_1, 
         new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -474,15 +489,15 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
     stubQueueAllocation(c, clusterResource, node_0, 2*GB);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, c, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 6*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -501,17 +516,17 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
     stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
     stubQueueAllocation(c, clusterResource, node_2, 1*GB);
-    root.assignContainers(clusterResource, node_2, false, 
+    root.assignContainers(clusterResource, node_2, 
         new ResourceLimits(clusterResource));
     allocationOrder = inOrder(a, a2, a1, b, c);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(a2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
     verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -611,7 +626,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
@@ -620,13 +635,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // also, B gets a scheduling opportunity since A allocates RACK_LOCAL
     stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_1, false, 
+    root.assignContainers(clusterResource, node_1, 
         new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     
@@ -635,13 +650,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // However, since B returns off-switch, A won't get an opportunity
     stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -680,7 +695,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // Simulate B3 returning a container on node_0
     stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     verifyQueueMetrics(b2, 0*GB, clusterResource);
     verifyQueueMetrics(b3, 1*GB, clusterResource);
@@ -689,13 +704,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
     stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_1, false, 
+    root.assignContainers(clusterResource, node_1, 
         new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(b2, b3);
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 2*GB, clusterResource);
     
@@ -704,13 +719,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
     // However, since B3 returns off-switch, B2 won't get an opportunity
     stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false, 
+    root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b3, b2);
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits());
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 3*GB, clusterResource);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index b3250e5..c5b7587 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -265,7 +265,7 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -277,7 +277,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -289,7 +289,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -304,7 +304,7 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -320,7 +320,7 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // assign reducer to node 2
-    a.assignContainers(clusterResource, node_2, false,
+    a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -337,7 +337,7 @@ public class TestReservations {
 
     // node_1 heartbeat and unreserves from node_0 in order to allocate
     // on node_1
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
@@ -421,7 +421,7 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -433,7 +433,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -445,7 +445,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -460,7 +460,7 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -476,7 +476,7 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // assign reducer to node 2
-    a.assignContainers(clusterResource, node_2, false,
+    a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -493,7 +493,7 @@ public class TestReservations {
 
     // node_1 heartbeat and won't unreserve from node_0, potentially stuck
     // if AM doesn't handle
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -569,7 +569,7 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -580,7 +580,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -591,7 +591,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -605,7 +605,7 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -620,7 +620,7 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // could allocate but told need to unreserve first
-    a.assignContainers(clusterResource, node_1, true,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -652,6 +652,8 @@ public class TestReservations {
     String host_1 = "host_1";
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
         8 * GB);
+    
+    Resource clusterResource = Resources.createResource(2 * 8 * GB);
 
     // Setup resource-requests
     Priority priorityMap = TestUtils.createMockPriority(5);
@@ -681,23 +683,28 @@ public class TestReservations {
         node_0.getNodeID(), "user", rmContext);
 
     // no reserved containers
-    NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
+    NodeId unreserveId =
+        app_0.getNodeIdToUnreserve(priorityMap, capability,
+            cs.getResourceCalculator(), clusterResource);
     assertEquals(null, unreserveId);
 
     // no reserved containers - reserve then unreserve
     app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
     app_0.unreserve(node_0, priorityMap);
-    unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
+    unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
+        cs.getResourceCalculator(), clusterResource);
     assertEquals(null, unreserveId);
 
     // no container large enough is reserved
     app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
-    unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
+    unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
+        cs.getResourceCalculator(), clusterResource);
     assertEquals(null, unreserveId);
 
     // reserve one that is now large enough
     app_0.reserve(node_1, priorityMap, rmContainer, container);
-    unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
+    unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
+        cs.getResourceCalculator(), clusterResource);
     assertEquals(node_1.getNodeID(), unreserveId);
   }
 
@@ -741,14 +748,14 @@ public class TestReservations {
 
     // nothing reserved
     boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
-        node_1, app_0, priorityMap, capability);
+        node_1, app_0, priorityMap, capability, capability);
     assertFalse(res);
 
     // reserved but scheduler doesn't know about that node.
     app_0.reserve(node_1, priorityMap, rmContainer, container);
     node_1.reserveResource(app_0, priorityMap, rmContainer);
     res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
-        priorityMap, capability);
+        priorityMap, capability, capability);
     assertFalse(res);
   }
 
@@ -815,7 +822,7 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -826,7 +833,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -837,7 +844,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -852,14 +859,15 @@ public class TestReservations {
     // absoluteMaxCapacity
     Resource capability = Resources.createResource(32 * GB, 0);
     boolean res =
-        a.canAssignToThisQueue(clusterResource, capability,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
+        a.canAssignToThisQueue(clusterResource,
+            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+                clusterResource), capability, Resources.none());
     assertFalse(res);
 
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -872,14 +880,17 @@ public class TestReservations {
 
     capability = Resources.createResource(5 * GB, 0);
     res =
-        a.canAssignToThisQueue(clusterResource, capability,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
+        a.canAssignToThisQueue(clusterResource,
+            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+                clusterResource), capability, Resources
+                .createResource(5 * GB));
     assertTrue(res);
 
     // tell to not check reservations
     res =
-        a.canAssignToThisQueue(clusterResource, capability,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
+        a.canAssignToThisQueue(clusterResource,
+            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+                clusterResource), capability, Resources.none());
     assertFalse(res);
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
@@ -887,13 +898,16 @@ public class TestReservations {
     // should return false no matter what checkReservations is passed
     // in since feature is off
     res =
-        a.canAssignToThisQueue(clusterResource, capability,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
+        a.canAssignToThisQueue(clusterResource,
+            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+                clusterResource), capability, Resources.none());
     assertFalse(res);
 
     res =
-        a.canAssignToThisQueue(clusterResource, capability,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
+        a.canAssignToThisQueue(clusterResource,
+            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
+                clusterResource), capability, Resources
+                .createResource(5 * GB));
     assertFalse(res);
   }
 
@@ -985,15 +999,15 @@ public class TestReservations {
         .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
             priorityAM, recordFactory)));
     app_0.updateResourceRequests(Collections.singletonList(TestUtils
-        .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
-            priorityReduce, recordFactory)));
-    app_0.updateResourceRequests(Collections.singletonList(TestUtils
         .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
             priorityMap, recordFactory)));
+    app_0.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
+            priorityReduce, recordFactory)));
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1004,7 +1018,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1015,7 +1029,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1029,7 +1043,7 @@ public class TestReservations {
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1117,18 +1131,18 @@ public class TestReservations {
         .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
             priorityAM, recordFactory)));
     app_0.updateResourceRequests(Collections.singletonList(TestUtils
-        .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
-            priorityReduce, recordFactory)));
-    app_0.updateResourceRequests(Collections.singletonList(TestUtils
         .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
             priorityMap, recordFactory)));
     app_0.updateResourceRequests(Collections.singletonList(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
+            priorityReduce, recordFactory)));
+    app_0.updateResourceRequests(Collections.singletonList(TestUtils
         .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
             priorityLast, recordFactory)));
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1140,7 +1154,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1152,7 +1166,7 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false,
+    a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1164,38 +1178,41 @@ public class TestReservations {
     assertEquals(3 * GB, node_1.getUsedResource().getMemory());
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
-    // try to assign reducer (5G on node 0), but tell it
-    // it has to unreserve. No room to allocate and shouldn't reserve
-    // since nothing currently reserved.
-    a.assignContainers(clusterResource, node_0, true,
-        new ResourceLimits(clusterResource));
+    // try to assign reducer (5G on node 0), but tell it's resource limits <
+    // used (8G) + required (5G). It will not reserved since it has to unreserve
+    // some resource. Even with continous reservation looking, we don't allow 
+    // unreserve resource to reserve container.
+    a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(Resources.createResource(10 * GB)));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
     assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
     assertEquals(16 * GB, a.getMetrics().getAvailableMB());
-    assertEquals(16 * GB, app_0.getHeadroom().getMemory());
+    // app_0's headroom = limit (10G) - used (8G) = 2G 
+    assertEquals(2 * GB, app_0.getHeadroom().getMemory());
     assertEquals(5 * GB, node_0.getUsedResource().getMemory());
     assertEquals(3 * GB, node_1.getUsedResource().getMemory());
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
-    // try to assign reducer (5G on node 2), but tell it
-    // it has to unreserve. Has room but shouldn't reserve
-    // since nothing currently reserved.
-    a.assignContainers(clusterResource, node_2, true,
-        new ResourceLimits(clusterResource));
+    // try to assign reducer (5G on node 0), but tell it's resource limits <
+    // used (8G) + required (5G). It will not reserved since it has to unreserve
+    // some resource. Unfortunately, there's nothing to unreserve.
+    a.assignContainers(clusterResource, node_2,
+        new ResourceLimits(Resources.createResource(10 * GB)));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
     assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
     assertEquals(16 * GB, a.getMetrics().getAvailableMB());
-    assertEquals(16 * GB, app_0.getHeadroom().getMemory());
+    // app_0's headroom = limit (10G) - used (8G) = 2G 
+    assertEquals(2 * GB, app_0.getHeadroom().getMemory());
     assertEquals(5 * GB, node_0.getUsedResource().getMemory());
     assertEquals(3 * GB, node_1.getUsedResource().getMemory());
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // let it assign 5G to node_2
-    a.assignContainers(clusterResource, node_2, false,
+    a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1208,7 +1225,7 @@ public class TestReservations {
     assertEquals(5 * GB, node_2.getUsedResource().getMemory());
 
     // reserve 8G node_0
-    a.assignContainers(clusterResource, node_0, false,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource));
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@@ -1223,7 +1240,7 @@ public class TestReservations {
     // try to assign (8G on node 2). No room to allocate,
     // continued to try due to having reservation above,
     // but hits queue limits so can't reserve anymore.
-    a.assignContainers(clusterResource, node_2, false,
+    a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource));
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());


[2/2] hadoop git commit: YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.

Posted by ji...@apache.org.
YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/487374b7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/487374b7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/487374b7

Branch: refs/heads/trunk
Commit: 487374b7fe0c92fc7eb1406c568952722b5d5b15
Parents: a89b087
Author: Jian He <ji...@apache.org>
Authored: Tue Mar 17 10:22:15 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue Mar 17 10:24:23 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/AbstractCSQueue.java     | 112 ++++++-
 .../scheduler/capacity/CSQueue.java             |   4 +-
 .../scheduler/capacity/CapacityScheduler.java   |  33 ++-
 .../scheduler/capacity/LeafQueue.java           | 292 +++++++------------
 .../scheduler/capacity/ParentQueue.java         | 140 +++------
 .../scheduler/common/fica/FiCaSchedulerApp.java |  16 +-
 .../capacity/TestApplicationLimits.java         |   8 +-
 .../capacity/TestCapacityScheduler.java         |  59 ++++
 .../scheduler/capacity/TestChildQueueOrder.java |  25 +-
 .../scheduler/capacity/TestLeafQueue.java       | 142 ++++-----
 .../scheduler/capacity/TestParentQueue.java     |  97 +++---
 .../scheduler/capacity/TestReservations.java    | 147 +++++-----
 13 files changed, 561 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 82934ad..f5b72d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -56,6 +56,9 @@ Release 2.8.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    YARN-3243. CapacityScheduler should pass headroom from parent to children
+    to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index d800709..4e53060 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
@@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.collect.Sets;
 
 public abstract class AbstractCSQueue implements CSQueue {
+  private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
   
   CSQueue parent;
   final String queueName;
@@ -406,21 +411,102 @@ public abstract class AbstractCSQueue implements CSQueue {
                                         parentQ.getPreemptionDisabled());
   }
   
-  protected Resource getCurrentResourceLimit(Resource clusterResource,
-      ResourceLimits currentResourceLimits) {
+  private Resource getCurrentLimitResource(String nodeLabel,
+      Resource clusterResource, ResourceLimits currentResourceLimits) {
     /*
-     * Queue's max available resource = min(my.max, my.limit)
-     * my.limit is set by my parent, considered used resource of my siblings
+     * Current limit resource: For labeled resource: limit = queue-max-resource
+     * (TODO, this part need update when we support labeled-limit) For
+     * non-labeled resource: limit = min(queue-max-resource,
+     * limit-set-by-parent)
      */
     Resource queueMaxResource =
-        Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
-            queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
-    Resource queueCurrentResourceLimit =
-        Resources.min(resourceCalculator, clusterResource, queueMaxResource,
-            currentResourceLimits.getLimit());
-    queueCurrentResourceLimit =
-        Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
-            minimumAllocation);
-    return queueCurrentResourceLimit;
+        Resources.multiplyAndNormalizeDown(resourceCalculator,
+            labelManager.getResourceByLabel(nodeLabel, clusterResource),
+            queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
+    if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
+      return Resources.min(resourceCalculator, clusterResource,
+          queueMaxResource, currentResourceLimits.getLimit());
+    }
+    return queueMaxResource;
+  }
+  
+  synchronized boolean canAssignToThisQueue(Resource clusterResource,
+      Set<String> nodeLabels, ResourceLimits currentResourceLimits,
+      Resource nowRequired, Resource resourceCouldBeUnreserved) {
+    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
+    Set<String> labelCanAccess;
+    if (null == nodeLabels || nodeLabels.isEmpty()) {
+      labelCanAccess = new HashSet<String>();
+      // Any queue can always access any node without label
+      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+    } else {
+      labelCanAccess = new HashSet<String>(
+          accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
+              : Sets.intersection(accessibleLabels, nodeLabels));
+    }
+    
+    for (String label : labelCanAccess) {
+      // New total resource = used + required
+      Resource newTotalResource =
+          Resources.add(queueUsage.getUsed(label), nowRequired);
+
+      Resource currentLimitResource =
+          getCurrentLimitResource(label, clusterResource, currentResourceLimits);
+
+      // if reservation continous looking enabled, check to see if could we
+      // potentially use this node instead of a reserved node if the application
+      // has reserved containers.
+      // TODO, now only consider reservation cases when the node has no label
+      if (this.reservationsContinueLooking
+          && label.equals(RMNodeLabelsManager.NO_LABEL)
+          && Resources.greaterThan(resourceCalculator, clusterResource,
+              resourceCouldBeUnreserved, Resources.none())) {
+        // resource-without-reserved = used - reserved
+        Resource newTotalWithoutReservedResource =
+            Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+        
+        // when total-used-without-reserved-resource < currentLimit, we still
+        // have chance to allocate on this node by unreserving some containers
+        if (Resources.lessThan(resourceCalculator, clusterResource,
+            newTotalWithoutReservedResource, currentLimitResource)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("try to use reserved: " + getQueueName()
+                + " usedResources: " + queueUsage.getUsed()
+                + ", clusterResources: " + clusterResource
+                + ", reservedResources: " + resourceCouldBeUnreserved
+                + ", capacity-without-reserved: "
+                + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+                + currentLimitResource); 
+          }
+          return true;
+        }
+      }
+      
+      // Otherwise, if any of the label of this node beyond queue limit, we
+      // cannot allocate on this node. Consider a small epsilon here.
+      if (Resources.greaterThan(resourceCalculator, clusterResource,
+          newTotalResource, currentLimitResource)) {
+        return false;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getQueueName()
+            + "Check assign to queue, label=" + label
+            + " usedResources: " + queueUsage.getUsed(label)
+            + " clusterResources: " + clusterResource
+            + " currentUsedCapacity "
+            + Resources.divide(resourceCalculator, clusterResource,
+                queueUsage.getUsed(label),
+                labelManager.getResourceByLabel(label, clusterResource))
+            + " max-capacity: "
+            + queueCapacities.getAbsoluteMaximumCapacity(label)
+            + ")");
+      }
+      return true;
+    }
+    
+    // Actually, this will not happen, since labelCanAccess will be always
+    // non-empty
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 0a60acc..1a9448a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -189,13 +189,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
-   * @param needToUnreserve assign container only if it can unreserve one first
    * @param resourceLimits how much overall resource of this queue can use. 
    * @return the assignment
    */
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits resourceLimits);
+      FiCaSchedulerNode node, ResourceLimits resourceLimits);
   
   /**
    * A container assigned to the queue has completed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 756e537..c86c0ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1061,9 +1061,14 @@ public class CapacityScheduler extends
           node.getNodeID());
       
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      CSAssignment assignment = queue.assignContainers(clusterResource, node,
-          false, new ResourceLimits(
-              clusterResource));
+      CSAssignment assignment =
+          queue.assignContainers(
+              clusterResource,
+              node,
+              // TODO, now we only consider limits for parent for non-labeled
+              // resources, should consider labeled resources as well.
+              new ResourceLimits(labelManager.getResourceByLabel(
+                  RMNodeLabelsManager.NO_LABEL, clusterResource)));
       
       RMContainer excessReservation = assignment.getExcessReservation();
       if (excessReservation != null) {
@@ -1087,8 +1092,13 @@ public class CapacityScheduler extends
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());
         }
-        root.assignContainers(clusterResource, node, false, new ResourceLimits(
-            clusterResource));
+        root.assignContainers(
+            clusterResource,
+            node,
+            // TODO, now we only consider limits for parent for non-labeled
+            // resources, should consider labeled resources as well.
+            new ResourceLimits(labelManager.getResourceByLabel(
+                RMNodeLabelsManager.NO_LABEL, clusterResource)));
       }
     } else {
       LOG.info("Skipping scheduling since node " + node.getNodeID() + 
@@ -1209,6 +1219,13 @@ public class CapacityScheduler extends
         usePortForNodeName, nodeManager.getNodeLabels());
     this.nodes.put(nodeManager.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+
+    // update this node to node label manager
+    if (labelManager != null) {
+      labelManager.activateNode(nodeManager.getNodeID(),
+          nodeManager.getTotalCapability());
+    }
+    
     root.updateClusterResource(clusterResource, new ResourceLimits(
         clusterResource));
     int numNodes = numNodeManagers.incrementAndGet();
@@ -1220,12 +1237,6 @@ public class CapacityScheduler extends
     if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
     }
-    
-    // update this node to node label manager
-    if (labelManager != null) {
-      labelManager.activateNode(nodeManager.getNodeID(),
-          nodeManager.getTotalCapability());
-    }
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a607a62..dd6a894 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
 
 @Private
 @Unstable
@@ -157,7 +156,7 @@ public class LeafQueue extends AbstractCSQueue {
     // and all queues may not be realized yet, we'll use (optimistic) 
     // absoluteMaxCapacity (it will be replaced with the more accurate 
     // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
-    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
     userLimit = conf.getUserLimit(getQueuePath());
@@ -739,9 +738,8 @@ public class LeafQueue extends AbstractCSQueue {
   
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits currentResourceLimits) {
-    this.currentResourceLimits = currentResourceLimits;
+      FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
+    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
@@ -796,7 +794,7 @@ public class LeafQueue extends AbstractCSQueue {
             continue;
           }
           if (!this.reservationsContinueLooking) {
-            if (!needContainers(application, priority, required)) {
+            if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("doesn't need containers based on reservation algo!");
               }
@@ -818,8 +816,8 @@ public class LeafQueue extends AbstractCSQueue {
                   required, requestedNodeLabels);          
           
           // Check queue max-capacity limit
-          if (!canAssignToThisQueue(clusterResource, required,
-              node.getLabels(), application, true)) {
+          if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
+              this.currentResourceLimits, required, application.getCurrentReservation())) {
             return NULL_ASSIGNMENT;
           }
 
@@ -835,7 +833,7 @@ public class LeafQueue extends AbstractCSQueue {
           // Try to schedule
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
-                null, needToUnreserve);
+                null);
 
           // Did the application skip this node?
           if (assignment.getSkipped()) {
@@ -896,7 +894,7 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Try to assign if we have sufficient resources
     assignContainersOnNode(clusterResource, node, application, priority, 
-        rmContainer, false);
+        rmContainer);
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
@@ -938,102 +936,14 @@ public class LeafQueue extends AbstractCSQueue {
         Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
     return headroom;
   }
-
-  synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      Resource required, Set<String> nodeLabels, FiCaSchedulerApp application, 
-      boolean checkReservations) {
-    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
-    Set<String> labelCanAccess;
-    if (null == nodeLabels || nodeLabels.isEmpty()) {
-      labelCanAccess = new HashSet<String>();
-      // Any queue can always access any node without label
-      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
-    } else {
-      labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
-    }
-    
-    boolean canAssign = true;
-    for (String label : labelCanAccess) {
-      Resource potentialTotalCapacity =
-          Resources.add(queueUsage.getUsed(label), required);
-      
-      float potentialNewCapacity =
-          Resources.divide(resourceCalculator, clusterResource,
-              potentialTotalCapacity,
-              labelManager.getResourceByLabel(label, clusterResource));
-      // if enabled, check to see if could we potentially use this node instead
-      // of a reserved node if the application has reserved containers
-      // TODO, now only consider reservation cases when the node has no label
-      if (this.reservationsContinueLooking && checkReservations
-          && label.equals(RMNodeLabelsManager.NO_LABEL)) {
-        float potentialNewWithoutReservedCapacity = Resources.divide(
-            resourceCalculator,
-            clusterResource,
-            Resources.subtract(potentialTotalCapacity,
-               application.getCurrentReservation()),
-            labelManager.getResourceByLabel(label, clusterResource));
-
-        if (potentialNewWithoutReservedCapacity <= queueCapacities
-            .getAbsoluteMaximumCapacity()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("try to use reserved: "
-                + getQueueName()
-                + " usedResources: "
-                + queueUsage.getUsed()
-                + " clusterResources: "
-                + clusterResource
-                + " reservedResources: "
-                + application.getCurrentReservation()
-                + " currentCapacity "
-                + Resources.divide(resourceCalculator, clusterResource,
-                    queueUsage.getUsed(), clusterResource) + " required " + required
-                + " potentialNewWithoutReservedCapacity: "
-                + potentialNewWithoutReservedCapacity + " ( "
-                + " max-capacity: "
-                + queueCapacities.getAbsoluteMaximumCapacity() + ")");
-          }
-          // we could potentially use this node instead of reserved node
-          return true;
-        }
-      }
-      
-      // Otherwise, if any of the label of this node beyond queue limit, we
-      // cannot allocate on this node. Consider a small epsilon here.
-      if (potentialNewCapacity > queueCapacities
-          .getAbsoluteMaximumCapacity(label) + 1e-4) {
-        canAssign = false;
-        break;
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getQueueName()
-            + "Check assign to queue, label=" + label
-            + " usedResources: " + queueUsage.getUsed(label)
-            + " clusterResources: " + clusterResource
-            + " currentCapacity "
-            + Resources.divide(resourceCalculator, clusterResource,
-                queueUsage.getUsed(label),
-                labelManager.getResourceByLabel(label, clusterResource))
-            + " potentialNewCapacity: " + potentialNewCapacity + " ( "
-            + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
-            + ")");
-      }
-    }
-    
-    return canAssign;
-  }
   
-  private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
+  private void setQueueResourceLimitsInfo(
       Resource clusterResource) {
-    Resource queueCurrentResourceLimit =
-        getCurrentResourceLimit(clusterResource, currentResourceLimits);
-    
     synchronized (queueResourceLimitsInfo) {
-      queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
+      queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
+          .getLimit());
       queueResourceLimitsInfo.setClusterResource(clusterResource);
     }
-
-    return queueCurrentResourceLimit;
   }
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
@@ -1048,16 +958,16 @@ public class LeafQueue extends AbstractCSQueue {
         computeUserLimit(application, clusterResource, required,
             queueUser, requestedLabels);
 
-    Resource currentResourceLimit =
-        computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
     
     Resource headroom =
-        getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
+        getHeadroom(queueUser, currentResourceLimits.getLimit(),
+            clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
-          " queueMaxAvailRes=" + currentResourceLimit + 
+          " queueMaxAvailRes=" + currentResourceLimits.getLimit() + 
           " consumed=" + queueUser.getUsed() + 
           " headroom=" + headroom);
     }
@@ -1207,8 +1117,8 @@ public class LeafQueue extends AbstractCSQueue {
     return true;
   }
 
-  boolean needContainers(FiCaSchedulerApp application, Priority priority,
-      Resource required) {
+  boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
+      Priority priority, Resource required) {
     int requiredContainers = application.getTotalRequiredResources(priority);
     int reservedContainers = application.getNumReservedContainers(priority);
     int starvation = 0;
@@ -1240,7 +1150,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private CSAssignment assignContainersOnNode(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve) {
+      RMContainer reservedContainer) {
     Resource assigned = Resources.none();
 
     NodeType requestType = null;
@@ -1252,7 +1162,7 @@ public class LeafQueue extends AbstractCSQueue {
       requestType = NodeType.NODE_LOCAL;
       assigned =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           assigned, Resources.none())) {
@@ -1280,7 +1190,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       assigned = 
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
           assigned, Resources.none())) {
@@ -1308,7 +1218,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       assigned =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-            node, application, priority, reservedContainer, needToUnreserve,
+            node, application, priority, reservedContainer,
             allocatedContainer);
 
       // update locality statistics
@@ -1320,13 +1230,24 @@ public class LeafQueue extends AbstractCSQueue {
     
     return SKIP_ASSIGNMENT;
   }
+  
+  private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
+    // First we need to get minimum resource we need unreserve
+    // minimum-resource-need-unreserve = used + asked - limit
+    Resource minimumUnreservedResource =
+        Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
+            currentResourceLimits.getLimit());
+    return minimumUnreservedResource;
+  }
 
   @Private
   protected boolean findNodeToUnreserve(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      Resource capability) {
+      Resource askedResource, Resource minimumUnreservedResource) {
     // need to unreserve some other container first
-    NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
+    NodeId idToUnreserve =
+        application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
+            resourceCalculator, clusterResource);
     if (idToUnreserve == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("checked to see if could unreserve for app but nothing "
@@ -1343,7 +1264,7 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("unreserving for app: " + application.getApplicationId()
         + " on nodeId: " + idToUnreserve
         + " in order to replace reserved application and place it on node: "
-        + node.getNodeID() + " needing: " + capability);
+        + node.getNodeID() + " needing: " + askedResource);
     }
 
     // headroom
@@ -1364,15 +1285,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Private
   protected boolean checkLimitsToReserve(Resource clusterResource,
-      FiCaSchedulerApp application, Resource capability,
-      boolean needToUnreserve) {
-    if (needToUnreserve) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("we needed to unreserve to be able to allocate");
-      }
-      return false;
-    }
-
+      FiCaSchedulerApp application, Resource capability) {
     // we can't reserve if we got here based on the limit
     // checks assuming we could unreserve!!!
     Resource userLimit = computeUserLimitAndSetHeadroom(application,
@@ -1380,7 +1293,8 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Check queue max-capacity limit,
     // TODO: Consider reservation on labels
-    if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
+    if (!canAssignToThisQueue(clusterResource, null,
+        this.currentResourceLimits, capability, Resources.none())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit queue limit");
       }
@@ -1402,43 +1316,40 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignNodeLocalContainers(Resource clusterResource,
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
   }
 
-  private Resource assignRackLocalContainers(
-      Resource clusterResource, ResourceRequest rackLocalResourceRequest,  
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+  private Resource assignRackLocalContainers(Resource clusterResource,
+      ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
   }
 
-  private Resource assignOffSwitchContainers(
-      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
-      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, 
-      RMContainer reservedContainer, boolean needToUnreserve,
-      MutableObject allocatedContainer) {
+  private Resource assignOffSwitchContainers(Resource clusterResource,
+      ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
+      FiCaSchedulerApp application, Priority priority,
+      RMContainer reservedContainer, MutableObject allocatedContainer) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          needToUnreserve, allocatedContainer);
+          allocatedContainer);
     }
     
     return Resources.none();
@@ -1522,13 +1433,12 @@ public class LeafQueue extends AbstractCSQueue {
   private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, 
       FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer,
-      boolean needToUnreserve, MutableObject createdContainer) {
+      MutableObject createdContainer) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId()
         + " priority=" + priority.getPriority()
-        + " request=" + request + " type=" + type
-        + " needToUnreserve= " + needToUnreserve);
+        + " request=" + request + " type=" + type);
     }
     
     // check if the resource request can access the label
@@ -1548,12 +1458,14 @@ public class LeafQueue extends AbstractCSQueue {
     Resource available = node.getAvailableResource();
     Resource totalResource = node.getTotalResource();
 
-    if (!Resources.fitsIn(capability, totalResource)) {
+    if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
+        capability, totalResource)) {
       LOG.warn("Node : " + node.getNodeID()
           + " does not have sufficient resource for request : " + request
           + " node total capability : " + node.getTotalResource());
       return Resources.none();
     }
+
     assert Resources.greaterThan(
         resourceCalculator, clusterResource, available, Resources.none());
 
@@ -1566,18 +1478,9 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.warn("Couldn't get container for allocation!");
       return Resources.none();
     }
-
-    // default to true since if reservation continue look feature isn't on
-    // needContainers is checked earlier and we wouldn't have gotten this far
-    boolean canAllocContainer = true;
-    if (this.reservationsContinueLooking) {
-      // based on reservations can we allocate/reserve more or do we need
-      // to unreserve one first
-      canAllocContainer = needContainers(application, priority, capability);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("can alloc container is: " + canAllocContainer);
-      }
-    }
+    
+    boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
+        application, priority, capability);
 
     // Can we allocate a container on this node?
     int availableContainers = 
@@ -1588,25 +1491,25 @@ public class LeafQueue extends AbstractCSQueue {
       // Did we previously reserve containers at this 'priority'?
       if (rmContainer != null) {
         unreserve(application, priority, node, rmContainer);
-      } else if (this.reservationsContinueLooking
-          && (!canAllocContainer || needToUnreserve)) {
-        // need to unreserve some other container first
-        boolean res = findNodeToUnreserve(clusterResource, node, application,
-            priority, capability);
-        if (!res) {
-          return Resources.none();
-        }
-      } else {
-        // we got here by possibly ignoring queue capacity limits. If the
-        // parameter needToUnreserve is true it means we ignored one of those
-        // limits in the chance we could unreserve. If we are here we aren't
-        // trying to unreserve so we can't allocate anymore due to that parent
-        // limit.
-        if (needToUnreserve) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("we needed to unreserve to be able to allocate, skipping");
+      } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
+        // when reservationsContinueLooking is set, we may need to unreserve
+        // some containers to meet this queue and its parents' resource limits
+        // TODO, need change here when we want to support continuous reservation
+        // looking for labeled partitions.
+        Resource minimumUnreservedResource =
+            getMinimumResourceNeedUnreserved(capability);
+        if (!shouldAllocOrReserveNewContainer
+            || Resources.greaterThan(resourceCalculator, clusterResource,
+                minimumUnreservedResource, Resources.none())) {
+          boolean containerUnreserved =
+              findNodeToUnreserve(clusterResource, node, application, priority,
+                  capability, minimumUnreservedResource);
+          // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved 
+          // container (That means we *have to* unreserve some resource to
+          // continue)). If we failed to unreserve some resource,
+          if (!containerUnreserved) {
+            return Resources.none();
           }
-          return Resources.none();
         }
       }
 
@@ -1632,17 +1535,16 @@ public class LeafQueue extends AbstractCSQueue {
     } else {
       // if we are allowed to allocate but this node doesn't have space, reserve it or
       // if this was an already a reserved container, reserve it again
-      if ((canAllocContainer) || (rmContainer != null)) {
-
-        if (reservationsContinueLooking) {
-          // we got here by possibly ignoring parent queue capacity limits. If
-          // the parameter needToUnreserve is true it means we ignored one of
-          // those limits in the chance we could unreserve. If we are here
-          // we aren't trying to unreserve so we can't allocate
-          // anymore due to that parent limit
-          boolean res = checkLimitsToReserve(clusterResource, application, capability, 
-              needToUnreserve);
-          if (!res) {
+      if (shouldAllocOrReserveNewContainer || rmContainer != null) {
+
+        if (reservationsContinueLooking && rmContainer == null) {
+          // we could possibly ignoring parent queue capacity limits when
+          // reservationsContinueLooking is set.
+          // If we're trying to reserve a container here, not container will be
+          // unreserved for reserving the new one. Check limits again before
+          // reserve the new container
+          if (!checkLimitsToReserve(clusterResource, 
+              application, capability)) {
             return Resources.none();
           }
         }
@@ -1784,18 +1686,36 @@ public class LeafQueue extends AbstractCSQueue {
         Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
             queueCapacities.getAbsoluteCapacity(), minimumAllocation);
   }
+  
+  private void updateCurrentResourceLimits(
+      ResourceLimits currentResourceLimits, Resource clusterResource) {
+    // TODO: need consider non-empty node labels when resource limits supports
+    // node labels
+    // Even if ParentQueue will set limits respect child's max queue capacity,
+    // but when allocating reserved container, CapacityScheduler doesn't do
+    // this. So need cap limits by queue's max capacity here.
+    this.currentResourceLimits = currentResourceLimits;
+    Resource queueMaxResource =
+        Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+            queueCapacities
+                .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
+            minimumAllocation);
+    this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
+        clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
+  }
 
   @Override
   public synchronized void updateClusterResource(Resource clusterResource,
       ResourceLimits currentResourceLimits) {
-    this.currentResourceLimits = currentResourceLimits;
+    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     lastClusterResource = clusterResource;
     updateAbsoluteCapacityResource(clusterResource);
     
     // Update headroom info based on new cluster resource value
     // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
     // during allocation
-    computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+    setQueueResourceLimitsInfo(clusterResource);
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 7feaa15..5ed6bb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.collect.Sets;
-
 @Private
 @Evolving
 public class ParentQueue extends AbstractCSQueue {
@@ -380,8 +376,7 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits resourceLimits) {
+      FiCaSchedulerNode node, ResourceLimits resourceLimits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     Set<String> nodeLabels = node.getLabels();
@@ -397,21 +392,18 @@ public class ParentQueue extends AbstractCSQueue {
           + getQueueName());
       }
       
-      boolean localNeedToUnreserve = false;
-      
       // Are we over maximum-capacity for this queue?
-      if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
-        // check to see if we could if we unreserve first
-        localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
-        if (!localNeedToUnreserve) {
-          break;
-        }
+      // This will also consider parent's limits and also continuous reservation
+      // looking
+      if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
+          minimumAllocation, Resources.createResource(getMetrics()
+              .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+        break;
       }
       
       // Schedule
       CSAssignment assignedToChild = 
-          assignContainersToChildQueues(clusterResource, node,
-              localNeedToUnreserve | needToUnreserve, resourceLimits);
+          assignContainersToChildQueues(clusterResource, node, resourceLimits);
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
@@ -459,74 +451,6 @@ public class ParentQueue extends AbstractCSQueue {
     return assignment;
   }
 
-  private synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      Set<String> nodeLabels) {
-    Set<String> labelCanAccess =
-        new HashSet<String>(
-            accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
-                : Sets.intersection(accessibleLabels, nodeLabels));
-    if (nodeLabels.isEmpty()) {
-      // Any queue can always access any node without label
-      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
-    }
-    
-    boolean canAssign = true;
-    for (String label : labelCanAccess) {
-      float currentAbsoluteLabelUsedCapacity =
-          Resources.divide(resourceCalculator, clusterResource,
-              queueUsage.getUsed(label),
-              labelManager.getResourceByLabel(label, clusterResource));
-      // if any of the label doesn't beyond limit, we can allocate on this node
-      if (currentAbsoluteLabelUsedCapacity >= 
-            queueCapacities.getAbsoluteMaximumCapacity(label)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
-              + " current-capacity (" + queueUsage.getUsed(label) + ") "
-              + " >= max-capacity ("
-              + labelManager.getResourceByLabel(label, clusterResource) + ")");
-        }
-        canAssign = false;
-        break;
-      }
-    }
-    
-    return canAssign;
-  }
-
-  
-  private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
-    if (this.reservationsContinueLooking) {      
-      // check to see if we could potentially use this node instead of a reserved
-      // node
-
-      Resource reservedResources = Resources.createResource(getMetrics()
-          .getReservedMB(), getMetrics().getReservedVirtualCores());
-      float capacityWithoutReservedCapacity = Resources.divide(
-          resourceCalculator, clusterResource,
-          Resources.subtract(queueUsage.getUsed(), reservedResources),
-          clusterResource);
-
-      if (capacityWithoutReservedCapacity <= queueCapacities
-          .getAbsoluteMaximumCapacity()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("parent: try to use reserved: " + getQueueName()
-            + " usedResources: " + queueUsage.getUsed().getMemory()
-            + " clusterResources: " + clusterResource.getMemory()
-            + " reservedResources: " + reservedResources.getMemory()
-            + " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
-            / clusterResource.getMemory()
-            + " potentialNewWithoutReservedCapacity: "
-            + capacityWithoutReservedCapacity + " ( " + " max-capacity: "
-            + queueCapacities.getAbsoluteMaximumCapacity() + ")");
-        }
-        // we could potentially use this node instead of reserved node
-        return true;
-      }
-    }
-    return false;
-   }
-
-  
   private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
     return (node.getReservedContainer() == null) && 
         Resources.greaterThanOrEqual(resourceCalculator, clusterResource, 
@@ -534,28 +458,38 @@ public class ParentQueue extends AbstractCSQueue {
   }
   
   private ResourceLimits getResourceLimitsOfChild(CSQueue child,
-      Resource clusterResource, ResourceLimits myLimits) {
-    /*
-     * Set head-room of a given child, limit =
-     * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
-     * + child.used. To avoid any of this queue's and its ancestors' limit
-     * being violated
-     */
-    Resource myCurrentLimit =
-        getCurrentResourceLimit(clusterResource, myLimits);
-    // My available resource = my-current-limit - my-used-resource
-    Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
-        getUsedResources());
-    // Child's limit = my-available-resource + resource-already-used-by-child
+      Resource clusterResource, ResourceLimits parentLimits) {
+    // Set resource-limit of a given child, child.limit =
+    // min(my.limit - my.used + child.used, child.max)
+
+    // Parent available resource = parent-limit - parent-used-resource
+    Resource parentMaxAvailableResource =
+        Resources.subtract(parentLimits.getLimit(), getUsedResources());
+
+    // Child's limit = parent-available-resource + child-used
     Resource childLimit =
-        Resources.add(myMaxAvailableResource, child.getUsedResources());
-    
+        Resources.add(parentMaxAvailableResource, child.getUsedResources());
+
+    // Get child's max resource
+    Resource childConfiguredMaxResource =
+        Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+            child.getAbsoluteMaximumCapacity(), minimumAllocation);
+
+    // Child's limit should be capped by child configured max resource
+    childLimit =
+        Resources.min(resourceCalculator, clusterResource, childLimit,
+            childConfiguredMaxResource);
+
+    // Normalize before return
+    childLimit =
+        Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
+
     return new ResourceLimits(childLimit);
   }
   
   private synchronized CSAssignment assignContainersToChildQueues(
-      Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
-      ResourceLimits limits) {
+      Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -573,9 +507,7 @@ public class ParentQueue extends AbstractCSQueue {
       ResourceLimits childLimits =
           getResourceLimitsOfChild(childQueue, cluster, limits);
       
-      assignment =
-          childQueue.assignContainers(cluster, node, needToUnreserve,
-              childLimits);
+      assignment = childQueue.assignContainers(cluster, node, childLimits);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9f97b13..6cc2777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -274,7 +274,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
   
   synchronized public NodeId getNodeIdToUnreserve(Priority priority,
-      Resource capability) {
+      Resource resourceNeedUnreserve, ResourceCalculator rc,
+      Resource clusterResource) {
 
     // first go around make this algorithm simple and just grab first
     // reservation that has enough resources
@@ -283,16 +284,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
       for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
+        NodeId nodeId = entry.getKey();
+        Resource containerResource = entry.getValue().getContainer().getResource();
+        
         // make sure we unreserve one with at least the same amount of
         // resources, otherwise could affect capacity limits
-        if (Resources.fitsIn(capability, entry.getValue().getContainer()
-            .getResource())) {
+        if (Resources.lessThanOrEqual(rc, clusterResource,
+            resourceNeedUnreserve, containerResource)) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("unreserving node with reservation size: "
-                + entry.getValue().getContainer().getResource()
-                + " in order to allocate container with size: " + capability);
+                + containerResource
+                + " in order to allocate container with size: " + resourceNeedUnreserve);
           }
-          return entry.getKey();
+          return nodeId;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 8cad057..1ca5c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -611,7 +611,7 @@ public class TestApplicationLimits {
     app_0_0.updateResourceRequests(app_0_0_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -631,7 +631,7 @@ public class TestApplicationLimits {
     app_0_1.updateResourceRequests(app_0_1_requests);
 
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@@ -651,7 +651,7 @@ public class TestApplicationLimits {
     app_1_0.updateResourceRequests(app_1_0_requests);
     
     // Schedule to compute 
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -660,7 +660,7 @@ public class TestApplicationLimits {
 
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
-    queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource)); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 83ab104..7a265dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.ComparisonFailure;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -2483,6 +2484,64 @@ public class TestCapacityScheduler {
     Assert.assertEquals(30 * GB,
         am1.doHeartbeat().getAvailableResources().getMemory());
   }
+  
+  @Test
+  public void testParentQueueMaxCapsAreRespected() throws Exception {
+    /*
+     * Queue tree:
+     *          Root
+     *        /     \
+     *       A       B
+     *      / \
+     *     A1 A2 
+     */
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    csConf.setCapacity(A, 50);
+    csConf.setMaximumCapacity(A, 50);
+    csConf.setCapacity(B, 50);
+    
+    // Define 2nd-level queues
+    csConf.setQueues(A, new String[] {"a1", "a2"});
+    csConf.setCapacity(A1, 50);
+    csConf.setUserLimitFactor(A1, 100.0f);
+    csConf.setCapacity(A2, 50);
+    csConf.setUserLimitFactor(A2, 100.0f);
+    csConf.setCapacity(B1, B1_CAPACITY);
+    csConf.setUserLimitFactor(B1, 100.0f);
+    
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    
+    // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB 
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
+    
+    // Try to launch app2 in a2, asked 2GB, should success 
+    RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+    try {
+      // Try to allocate a container, a's usage=11G/max=12
+      // a1's usage=9G/max=12
+      // a2's usage=2G/max=12
+      // In this case, if a2 asked 2G, should fail.
+      waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
+    } catch (AssertionError failure) {
+      // Expected, return;
+      return;
+    }
+    Assert.fail("Shouldn't successfully allocate containers for am2, "
+        + "queue-a's max capacity will be violated if container allocated");
+  }
 
   private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 7edb17d..71dc523 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -145,7 +144,7 @@ public class TestChildQueueOrder {
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
           when(queue)
-              .assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+              .assignContainers(eq(clusterResource), eq(node),
                   any(ResourceLimits.class));
 
           // Mock the node's resource availability
@@ -157,7 +156,7 @@ public class TestChildQueueOrder {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(), 
+    when(queue).assignContainers(eq(clusterResource), eq(node), 
         any(ResourceLimits.class));
     doNothing().when(node).releaseContainer(any(Container.class));
   }
@@ -274,7 +273,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     for(int i=0; i < 2; i++)
     {
@@ -282,7 +281,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 1*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     } 
     for(int i=0; i < 3; i++)
@@ -291,7 +290,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 1*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }  
     for(int i=0; i < 4; i++)
@@ -300,7 +299,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }    
     verifyQueueMetrics(a, 1*GB, clusterResource);
@@ -334,7 +333,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(b, clusterResource, node_0, 0*GB);
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-      root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+      root.assignContainers(clusterResource, node_0, new ResourceLimits(
           clusterResource));
     }
     verifyQueueMetrics(a, 3*GB, clusterResource);
@@ -362,7 +361,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 3*GB, clusterResource);
@@ -389,7 +388,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -404,13 +403,13 @@ public class TestChildQueueOrder {
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+    root.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource));
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);