You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/03 20:32:06 UTC

[28/43] hadoop git commit: YARN-3265. Fixed a deadlock in CapacityScheduler by always passing a queue's available resource-limit from the parent queue. Contributed by Wangda Tan.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index ead5719..a5a2e5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -294,11 +295,13 @@ public class TestLeafQueue {
 	  //Verify the value for getAMResourceLimit for queues with < .1 maxcap
 	  Resource clusterResource = Resource.newInstance(50 * GB, 50);
 	  
-	  a.updateClusterResource(clusterResource);
+    a.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
 	  assertEquals(Resource.newInstance(1 * GB, 1), 
 	    a.getAMResourceLimit());
     
-	  b.updateClusterResource(clusterResource);
+	  b.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
 	  assertEquals(Resource.newInstance(5 * GB, 1), 
 	    b.getAMResourceLimit());
   }
@@ -347,7 +350,8 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(
         (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
         a.getMetrics().getAvailableMB());
@@ -482,7 +486,8 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -492,7 +497,8 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -500,7 +506,8 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getMetrics().getAllocatedMB());
     
     // Can't allocate 3rd due to user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -509,7 +516,8 @@ public class TestLeafQueue {
     
     // Bump up user-limit-factor, now allocate should work
     a.setUserLimitFactor(10);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -517,7 +525,8 @@ public class TestLeafQueue {
     assertEquals(3*GB, a.getMetrics().getAllocatedMB());
 
     // One more should work, for app_1, due to user-limit-factor
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -527,7 +536,8 @@ public class TestLeafQueue {
     // Test max-capacity
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -642,19 +652,22 @@ public class TestLeafQueue {
 //            recordFactory)));
 
     // 1 container to user_0
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Again one to user_0 since he hasn't exceeded user limit yet
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
 
     // One more to user_0 since he is the only active user
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
@@ -705,7 +718,8 @@ public class TestLeafQueue {
     assertEquals("There should only be 1 active user!",
         1, qb.getActiveUsersManager().getNumActiveUsers());
     //get headroom
-    qb.assignContainers(clusterResource, node_0, false);
+    qb.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
         null);
@@ -724,7 +738,8 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
             u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_2, user_1);
-    qb.assignContainers(clusterResource, node_1, false);
+    qb.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
         null);
@@ -766,8 +781,10 @@ public class TestLeafQueue {
              u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_1, user_0);
     qb.submitApplicationAttempt(app_3, user_1);
-    qb.assignContainers(clusterResource, node_0, false);
-    qb.assignContainers(clusterResource, node_0, false);
+    qb.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
+    qb.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
         .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
         null);
@@ -785,7 +802,8 @@ public class TestLeafQueue {
     app_4.updateResourceRequests(Collections.singletonList(
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
                       u0Priority, recordFactory)));
-    qb.assignContainers(clusterResource, node_1, false);
+    qb.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
         null);
@@ -857,7 +875,8 @@ public class TestLeafQueue {
             TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
                 priority, recordFactory)));
 
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -873,7 +892,8 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
             priority, recordFactory)));
 
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -961,7 +981,8 @@ public class TestLeafQueue {
         1, a.getActiveUsersManager().getNumActiveUsers());
 
     // 1 container to user_0
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -971,7 +992,8 @@ public class TestLeafQueue {
       // the application is not yet active
 
     // Again one to user_0 since he hasn't exceeded user limit yet
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -987,7 +1009,8 @@ public class TestLeafQueue {
 
     // No more to user_0 since he is already over user-limit
     // and no more containers to queue since it's already at max-cap
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1000,7 +1023,8 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
             priority, recordFactory)));
     assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(0*GB, app_2.getHeadroom().getMemory());   // hit queue max-cap 
   }
 
@@ -1070,21 +1094,24 @@ public class TestLeafQueue {
      */
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
     
     // Can't allocate 3rd due to user-limit
     a.setUserLimit(25);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1102,7 +1129,8 @@ public class TestLeafQueue {
     // Now allocations should goto app_2 since 
     // user_0 is at limit inspite of high user-limit-factor
     a.setUserLimitFactor(10);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1111,7 +1139,8 @@ public class TestLeafQueue {
 
     // Now allocations should goto app_0 since 
     // user_0 is at user-limit not above it
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1121,7 +1150,8 @@ public class TestLeafQueue {
     // Test max-capacity
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1132,7 +1162,8 @@ public class TestLeafQueue {
     // Now, allocations should goto app_3 since it's under user-limit 
     a.setMaxCapacity(1.0f);
     a.setUserLimitFactor(1);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(7*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1140,7 +1171,8 @@ public class TestLeafQueue {
     assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
 
     // Now we should assign to app_3 again since user_2 is under user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1239,7 +1271,8 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1249,7 +1282,8 @@ public class TestLeafQueue {
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1257,7 +1291,8 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getMetrics().getAllocatedMB());
     
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1273,7 +1308,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1289,7 +1325,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1356,7 +1393,8 @@ public class TestLeafQueue {
 
     // Start testing...
 
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1365,7 +1403,8 @@ public class TestLeafQueue {
     assertEquals(0*GB, a.getMetrics().getAvailableMB());
 
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1378,7 +1417,8 @@ public class TestLeafQueue {
     // We do not need locality delay here
     doReturn(-1).when(a).getNodeLocalityDelay();
     
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(10*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1394,7 +1434,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
@@ -1462,20 +1503,23 @@ public class TestLeafQueue {
     // Start testing...
     
     // Only 1 container
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
     
     // Now, reservation should kick in for app_1
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1489,7 +1533,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1498,7 +1543,8 @@ public class TestLeafQueue {
     assertEquals(1, app_1.getReReservations(priority));
 
     // Re-reserve
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1507,7 +1553,8 @@ public class TestLeafQueue {
     assertEquals(2, app_1.getReReservations(priority));
     
     // Try to schedule on node_1 now, should *move* the reservation
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(9*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1524,7 +1571,8 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    CSAssignment assignment = a.assignContainers(clusterResource, node_0, false);
+    CSAssignment assignment = a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1595,7 +1643,8 @@ public class TestLeafQueue {
     CSAssignment assignment = null;
     
     // Start with off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false);
+    assignment = a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
@@ -1603,7 +1652,8 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Another off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false);
+    assignment = a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority));
@@ -1611,7 +1661,8 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     // Another off switch, shouldn't allocate due to delay scheduling
-    assignment = a.assignContainers(clusterResource, node_2, false);
+    assignment = a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority));
@@ -1620,7 +1671,8 @@ public class TestLeafQueue {
     
     // Another off switch, now we should allocate 
     // since missedOpportunities=3 and reqdContainers=3
-    assignment = a.assignContainers(clusterResource, node_2, false);
+    assignment = a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
@@ -1628,7 +1680,8 @@ public class TestLeafQueue {
     assertEquals(NodeType.OFF_SWITCH, assignment.getType());
     
     // NODE_LOCAL - node_0
-    assignment = a.assignContainers(clusterResource, node_0, false);
+    assignment = a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1636,7 +1689,8 @@ public class TestLeafQueue {
     assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     // NODE_LOCAL - node_1
-    assignment = a.assignContainers(clusterResource, node_1, false);
+    assignment = a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1664,13 +1718,15 @@ public class TestLeafQueue {
     doReturn(1).when(a).getNodeLocalityDelay();
     
     // Shouldn't assign RACK_LOCAL yet
-    assignment = a.assignContainers(clusterResource, node_3, false);
+    assignment = a.assignContainers(clusterResource, node_3, false,
+        new ResourceLimits(clusterResource));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(2, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Should assign RACK_LOCAL now
-    assignment = a.assignContainers(clusterResource, node_3, false);
+    assignment = a.assignContainers(clusterResource, node_3, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1751,7 +1807,8 @@ public class TestLeafQueue {
     
     // Start with off switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
@@ -1763,7 +1820,8 @@ public class TestLeafQueue {
 
     // Another off-switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
@@ -1774,7 +1832,8 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Another off-switch, shouldn't allocate OFF_SWITCH P1
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
@@ -1785,7 +1844,8 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Now, DATA_LOCAL for P1
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1796,7 +1856,8 @@ public class TestLeafQueue {
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Now, OFF_SWITCH for P2
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1872,7 +1933,8 @@ public class TestLeafQueue {
     app_0.updateResourceRequests(app_0_requests_0);
     
     // NODE_LOCAL - node_0_1
-    a.assignContainers(clusterResource, node_0_0, false);
+    a.assignContainers(clusterResource, node_0_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1880,7 +1942,8 @@ public class TestLeafQueue {
 
     // No allocation on node_1_0 even though it's node/rack local since
     // required(ANY) == 0
-    a.assignContainers(clusterResource, node_1_0, false);
+    a.assignContainers(clusterResource, node_1_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
@@ -1896,14 +1959,16 @@ public class TestLeafQueue {
 
     // No allocation on node_0_1 even though it's node/rack local since
     // required(rack_1) == 0
-    a.assignContainers(clusterResource, node_0_1, false);
+    a.assignContainers(clusterResource, node_0_1, false,
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority)); 
     assertEquals(1, app_0.getTotalRequiredResources(priority));
     
     // NODE_LOCAL - node_1
-    a.assignContainers(clusterResource, node_1_0, false);
+    a.assignContainers(clusterResource, node_1_0, false,
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -2030,7 +2095,9 @@ public class TestLeafQueue {
     assertEquals(2, e.activeApplications.size());
     assertEquals(1, e.pendingApplications.size());
 
-    e.updateClusterResource(Resources.createResource(200 * 16 * GB, 100 * 32));
+    Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); 
+    e.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
 
     // after updating cluster resource
     assertEquals(3, e.activeApplications.size());
@@ -2153,7 +2220,8 @@ public class TestLeafQueue {
     
     // node_0_1  
     // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
-    a.assignContainers(clusterResource, node_0_1, false);
+    a.assignContainers(clusterResource, node_0_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2175,7 +2243,8 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since RR(rack_1) = relax: false
-    a.assignContainers(clusterResource, node_1_1, false);
+    a.assignContainers(clusterResource, node_1_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2205,7 +2274,8 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since node_1_1 is blacklisted
-    a.assignContainers(clusterResource, node_1_1, false);
+    a.assignContainers(clusterResource, node_1_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2233,7 +2303,8 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since rack_1 is blacklisted
-    a.assignContainers(clusterResource, node_1_1, false);
+    a.assignContainers(clusterResource, node_1_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2259,7 +2330,8 @@ public class TestLeafQueue {
     // Blacklist: < host_0_0 >       <----
 
     // Now, should allocate since RR(rack_1) = relax: true
-    a.assignContainers(clusterResource, node_1_1, false);
+    a.assignContainers(clusterResource, node_1_1, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
@@ -2289,7 +2361,8 @@ public class TestLeafQueue {
     // host_1_0: 8G
     // host_1_1: 7G
 
-    a.assignContainers(clusterResource, node_1_0, false);
+    a.assignContainers(clusterResource, node_1_0, false, 
+        new ResourceLimits(clusterResource));
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
@@ -2323,7 +2396,8 @@ public class TestLeafQueue {
 
     Resource newClusterResource = Resources.createResource(100 * 20 * GB,
         100 * 32);
-    a.updateClusterResource(newClusterResource);
+    a.updateClusterResource(newClusterResource, 
+        new ResourceLimits(newClusterResource));
     //  100 * 20 * 0.2 = 400
     assertEquals(a.getAMResourceLimit(), Resources.createResource(400 * GB, 1));
   }
@@ -2370,7 +2444,8 @@ public class TestLeafQueue {
             recordFactory)));
 
     try {
-      a.assignContainers(clusterResource, node_0, false);
+      a.assignContainers(clusterResource, node_0, false, 
+          new ResourceLimits(clusterResource));
     } catch (NullPointerException e) {
       Assert.fail("NPE when allocating container on node but "
           + "forget to set off-switch request should be handled");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 696ad7a..4f89386 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -36,7 +36,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -47,12 +46,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
@@ -154,8 +155,9 @@ public class TestParentQueue {
         
         // Next call - nothing
         if (allocation > 0) {
-          doReturn(new CSAssignment(Resources.none(), type)).
-            when(queue).assignContainers(eq(clusterResource), eq(node), eq(false));
+          doReturn(new CSAssignment(Resources.none(), type)).when(queue)
+              .assignContainers(eq(clusterResource), eq(node), eq(false),
+                  any(ResourceLimits.class));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -166,7 +168,8 @@ public class TestParentQueue {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node), eq(false));
+when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
+        any(ResourceLimits.class));
   }
   
   private float computeQueueAbsoluteUsedCapacity(CSQueue queue, 
@@ -229,19 +232,21 @@ public class TestParentQueue {
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
     // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
     stubQueueAllocation(a, clusterResource, node_1, 2*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -249,12 +254,13 @@ public class TestParentQueue {
     // since A has 2/6G while B has 2/14G
     stubQueueAllocation(a, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -262,12 +268,13 @@ public class TestParentQueue {
     // since A has 3/6G while B has 4/14G
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 4*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
 
@@ -275,12 +282,13 @@ public class TestParentQueue {
     // since A has 3/6G while B has 8/14G
     stubQueueAllocation(a, clusterResource, node_1, 1*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(a, b);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 9*GB, clusterResource);
   }
@@ -441,7 +449,8 @@ public class TestParentQueue {
     stubQueueAllocation(b, clusterResource, node_0, 0*GB);
     stubQueueAllocation(c, clusterResource, node_0, 1*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 0*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -453,7 +462,8 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_1, 0*GB);
     stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
     stubQueueAllocation(c, clusterResource, node_1, 0*GB);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -464,14 +474,15 @@ public class TestParentQueue {
     stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
     stubQueueAllocation(c, clusterResource, node_0, 2*GB);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, c, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 6*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -490,16 +501,17 @@ public class TestParentQueue {
     stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
     stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
     stubQueueAllocation(c, clusterResource, node_2, 1*GB);
-    root.assignContainers(clusterResource, node_2, false);
+    root.assignContainers(clusterResource, node_2, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(a, a2, a1, b, c);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
     verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -599,7 +611,8 @@ public class TestParentQueue {
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
@@ -607,12 +620,13 @@ public class TestParentQueue {
     // also, B gets a scheduling opportunity since A allocates RACK_LOCAL
     stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     
@@ -621,12 +635,13 @@ public class TestParentQueue {
     // However, since B returns off-switch, A won't get an opportunity
     stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -665,7 +680,8 @@ public class TestParentQueue {
     // Simulate B3 returning a container on node_0
     stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     verifyQueueMetrics(b2, 0*GB, clusterResource);
     verifyQueueMetrics(b3, 1*GB, clusterResource);
     
@@ -673,12 +689,13 @@ public class TestParentQueue {
     // also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
     stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_1, false);
+    root.assignContainers(clusterResource, node_1, false, 
+        new ResourceLimits(clusterResource));
     InOrder allocationOrder = inOrder(b2, b3);
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 2*GB, clusterResource);
     
@@ -687,12 +704,13 @@ public class TestParentQueue {
     // However, since B3 returns off-switch, B2 won't get an opportunity
     stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
-    root.assignContainers(clusterResource, node_0, false);
+    root.assignContainers(clusterResource, node_0, false, 
+        new ResourceLimits(clusterResource));
     allocationOrder = inOrder(b3, b2);
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyBoolean());
+        any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 3*GB, clusterResource);
 
@@ -774,4 +792,8 @@ public class TestParentQueue {
   @After
   public void tearDown() throws Exception {
   }
+  
+  private ResourceLimits anyResourceLimits() {
+    return any(ResourceLimits.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/14dd647c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 985609e..4c6b25f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -262,7 +263,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -273,7 +275,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -284,7 +287,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -298,7 +302,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -313,7 +318,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // assign reducer to node 2
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -329,7 +335,8 @@ public class TestReservations {
 
     // node_1 heartbeat and unreserves from node_0 in order to allocate
     // on node_1
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -411,7 +418,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -422,7 +430,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -433,7 +442,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -447,7 +457,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -462,7 +473,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // assign reducer to node 2
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -478,7 +490,8 @@ public class TestReservations {
 
     // node_1 heartbeat and won't unreserve from node_0, potentially stuck
     // if AM doesn't handle
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -552,7 +565,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -562,7 +576,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -572,7 +587,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -585,7 +601,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // try to assign reducer (5G on node 0 and should reserve)
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -599,7 +616,8 @@ public class TestReservations {
     assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
 
     // could allocate but told need to unreserve first
-    a.assignContainers(clusterResource, node_1, true);
+    a.assignContainers(clusterResource, node_1, true,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -792,7 +810,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -802,7 +821,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -812,7 +832,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -833,7 +854,8 @@ public class TestReservations {
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -966,7 +988,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -976,7 +999,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -986,7 +1010,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_1.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -999,7 +1024,8 @@ public class TestReservations {
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, app_0.getCurrentReservation().getMemory());
@@ -1096,7 +1122,8 @@ public class TestReservations {
 
     // Start testing...
     // Only AM
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1107,7 +1134,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map - simulating reduce
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1118,7 +1146,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // Only 1 map to other node - simulating reduce
-    a.assignContainers(clusterResource, node_1, false);
+    a.assignContainers(clusterResource, node_1, false,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1132,7 +1161,8 @@ public class TestReservations {
     // try to assign reducer (5G on node 0), but tell it
     // it has to unreserve. No room to allocate and shouldn't reserve
     // since nothing currently reserved.
-    a.assignContainers(clusterResource, node_0, true);
+    a.assignContainers(clusterResource, node_0, true,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1146,7 +1176,8 @@ public class TestReservations {
     // try to assign reducer (5G on node 2), but tell it
     // it has to unreserve. Has room but shouldn't reserve
     // since nothing currently reserved.
-    a.assignContainers(clusterResource, node_2, true);
+    a.assignContainers(clusterResource, node_2, true,
+        new ResourceLimits(clusterResource));
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1158,7 +1189,8 @@ public class TestReservations {
     assertEquals(0 * GB, node_2.getUsedResource().getMemory());
 
     // let it assign 5G to node_2
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1170,7 +1202,8 @@ public class TestReservations {
     assertEquals(5 * GB, node_2.getUsedResource().getMemory());
 
     // reserve 8G node_0
-    a.assignContainers(clusterResource, node_0, false);
+    a.assignContainers(clusterResource, node_0, false,
+        new ResourceLimits(clusterResource));
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());
@@ -1184,7 +1217,8 @@ public class TestReservations {
     // try to assign (8G on node 2). No room to allocate,
     // continued to try due to having reservation above,
     // but hits queue limits so can't reserve anymore.
-    a.assignContainers(clusterResource, node_2, false);
+    a.assignContainers(clusterResource, node_2, false,
+        new ResourceLimits(clusterResource));
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());