You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2013/01/09 06:08:24 UTC

svn commit: r1430682 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ hadoop-yarn...

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Jan  9 05:08:23 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -90,7 +91,7 @@ public class LeafQueue implements CSQueu
   private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
   private int maxActiveApplicationsPerUser;
   
-  private Resource usedResources = Resources.createResource(0);
+  private Resource usedResources = Resources.createResource(0, 0);
   private float usedCapacity = 0.0f;
   private volatile int numContainers;
 
@@ -126,12 +127,16 @@ public class LeafQueue implements CSQueu
   
   private final int nodeLocalityDelay;
   
+  private final ResourceCalculator resourceCalculator;
+  
   public LeafQueue(CapacitySchedulerContext cs, 
-      String queueName, CSQueue parent, 
-      Comparator<FiCaSchedulerApp> applicationComparator, CSQueue old) {
+      String queueName, CSQueue parent, CSQueue old) {
     this.scheduler = cs;
     this.queueName = queueName;
     this.parent = parent;
+    
+    this.resourceCalculator = cs.getResourceCalculator();
+
     // must be after parent and queueName are initialized
     this.metrics = old != null ? old.getMetrics() :
         QueueMetrics.forQueue(getQueuePath(), parent,
@@ -141,8 +146,9 @@ public class LeafQueue implements CSQueu
     this.minimumAllocation = cs.getMinimumResourceCapability();
     this.maximumAllocation = cs.getMaximumResourceCapability();
     this.minimumAllocationFactor = 
-        (float)(maximumAllocation.getMemory() - minimumAllocation.getMemory()) / 
-         maximumAllocation.getMemory();
+        Resources.ratio(resourceCalculator, 
+            Resources.subtract(maximumAllocation, minimumAllocation), 
+            maximumAllocation);
     this.containerTokenSecretManager = cs.getContainerTokenSecretManager();
 
     float capacity = 
@@ -171,10 +177,12 @@ public class LeafQueue implements CSQueu
             getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
     int maxActiveApplications = 
         CSQueueUtils.computeMaxActiveApplications(
+            resourceCalculator,
             cs.getClusterResources(), this.minimumAllocation,
             maxAMResourcePerQueuePercent, absoluteMaxCapacity);
     this.maxActiveAppsUsingAbsCap = 
             CSQueueUtils.computeMaxActiveApplications(
+                resourceCalculator,
                 cs.getClusterResources(), this.minimumAllocation,
                 maxAMResourcePerQueuePercent, absoluteCapacity);
     int maxActiveApplicationsPerUser = 
@@ -207,6 +215,8 @@ public class LeafQueue implements CSQueu
         + ", fullname=" + getQueuePath());
     }
 
+    Comparator<FiCaSchedulerApp> applicationComparator =
+        cs.getApplicationComparator();
     this.pendingApplications = 
         new TreeSet<FiCaSchedulerApp>(applicationComparator);
     this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
@@ -256,7 +266,8 @@ public class LeafQueue implements CSQueu
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
-        this, getParent(), clusterResource, minimumAllocation);
+        resourceCalculator, this, getParent(), clusterResource, 
+        minimumAllocation);
 
     LOG.info("Initializing " + queueName + "\n" +
         "capacity = " + capacity +
@@ -545,7 +556,7 @@ public class LeafQueue implements CSQueu
     return queueName + ": " + 
         "capacity=" + capacity + ", " + 
         "absoluteCapacity=" + absoluteCapacity + ", " + 
-        "usedResources=" + usedResources.getMemory() + "MB, " + 
+        "usedResources=" + usedResources +  
         "usedCapacity=" + getUsedCapacity() + ", " + 
         "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
         "numApps=" + getNumApplications() + ", " + 
@@ -754,7 +765,7 @@ public class LeafQueue implements CSQueu
   }
 
   private static final CSAssignment NULL_ASSIGNMENT =
-      new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
+      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
   
   @Override
   public synchronized CSAssignment 
@@ -813,7 +824,8 @@ public class LeafQueue implements CSQueu
           }
 
           // Check user limit
-          if (!assignToUser(application.getUser(), userLimit)) {
+          if (!assignToUser(
+              clusterResource, application.getUser(), userLimit)) {
             break; 
           }
 
@@ -827,7 +839,8 @@ public class LeafQueue implements CSQueu
 
           // Did we schedule or reserve a container?
           Resource assigned = assignment.getResource();
-          if (Resources.greaterThan(assigned, Resources.none())) {
+          if (Resources.greaterThan(
+              resourceCalculator, clusterResource, assigned, Resources.none())) {
 
             // Book-keeping 
             // Note: Update headroom to account for current allocation too...
@@ -882,21 +895,25 @@ public class LeafQueue implements CSQueu
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
-    return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
+    return Resources.none();
   }
 
   private synchronized boolean assignToQueue(Resource clusterResource, 
       Resource required) {
     // Check how of the cluster's absolute capacity we are currently using...
     float potentialNewCapacity = 
-      (float)(usedResources.getMemory() + required.getMemory()) / 
-        clusterResource.getMemory();
+        Resources.divide(
+            resourceCalculator, clusterResource, 
+            Resources.add(usedResources, required), 
+            clusterResource);
     if (potentialNewCapacity > absoluteMaxCapacity) {
       LOG.info(getQueueName() + 
-          " usedResources: " + usedResources.getMemory() +
-          " clusterResources: " + clusterResource.getMemory() +
-          " currentCapacity " + ((float)usedResources.getMemory())/clusterResource.getMemory() + 
-          " required " + required.getMemory() +
+          " usedResources: " + usedResources +
+          " clusterResources: " + clusterResource +
+          " currentCapacity " + 
+            Resources.divide(resourceCalculator, clusterResource, 
+                usedResources, clusterResource) + 
+          " required " + required +
           " potentialNewCapacity: " + potentialNewCapacity + " ( " +
           " max-capacity: " + absoluteMaxCapacity + ")");
       return false;
@@ -919,14 +936,18 @@ public class LeafQueue implements CSQueu
     
 
     Resource queueMaxCap =                        // Queue Max-Capacity
-        Resources.createResource(
-            CSQueueUtils.roundDown(minimumAllocation, 
-                (int)(absoluteMaxCapacity * clusterResource.getMemory()))
-            );
+        Resources.multiplyAndNormalizeDown(
+            resourceCalculator, 
+            clusterResource, 
+            absoluteMaxCapacity, 
+            minimumAllocation);
     
     Resource userConsumed = getUser(user).getConsumedResources(); 
     Resource headroom = 
-        Resources.subtract(Resources.min(userLimit, queueMaxCap), userConsumed);
+        Resources.subtract(
+            Resources.min(resourceCalculator, clusterResource, 
+                userLimit, queueMaxCap), 
+            userConsumed);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
@@ -953,35 +974,46 @@ public class LeafQueue implements CSQueu
     //   (usedResources + required) (which extra resources we are allocating)
 
     // Allow progress for queues with miniscule capacity
-    final int queueCapacity = 
-      Math.max(
-          CSQueueUtils.roundUp(
-              minimumAllocation, 
-              (int)(absoluteCapacity * clusterResource.getMemory())), 
-          required.getMemory()
-          );
-
-    final int consumed = usedResources.getMemory();
-    final int currentCapacity = 
-      (consumed < queueCapacity) ? 
-          queueCapacity : (consumed + required.getMemory());
-
+    final Resource queueCapacity =
+        Resources.max(
+            resourceCalculator, clusterResource, 
+            Resources.multiplyAndNormalizeUp(
+                resourceCalculator, 
+                clusterResource, 
+                absoluteCapacity, 
+                minimumAllocation), 
+            required);
+
+    Resource currentCapacity =
+        Resources.lessThan(resourceCalculator, clusterResource, 
+            usedResources, queueCapacity) ?
+            queueCapacity : Resources.add(usedResources, required);
+    
     // Never allow a single user to take more than the 
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
     
     final int activeUsers = activeUsersManager.getNumActiveUsers();  
-
-    int limit = 
-      CSQueueUtils.roundUp(
-          minimumAllocation,
-          Math.min(
-              Math.max(divideAndCeil(currentCapacity, activeUsers), 
-                       divideAndCeil((int)userLimit*currentCapacity, 100)),
-              (int)(queueCapacity * userLimitFactor)
-              )
-          );
+    		
+    Resource limit =
+        Resources.roundUp(
+            resourceCalculator, 
+            Resources.min(
+                resourceCalculator, clusterResource,   
+                Resources.max(
+                    resourceCalculator, clusterResource, 
+                    Resources.divideAndCeil(
+                        resourceCalculator, currentCapacity, activeUsers),
+                    Resources.divideAndCeil(
+                        resourceCalculator, 
+                        Resources.multiplyAndRoundDown(
+                            currentCapacity, userLimit), 
+                        100)
+                    ), 
+                Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor)
+                ), 
+            minimumAllocation);
 
     if (LOG.isDebugEnabled()) {
       String userName = application.getUser();
@@ -993,23 +1025,25 @@ public class LeafQueue implements CSQueu
           " consumed: " + getUser(userName).getConsumedResources() + 
           " limit: " + limit +
           " queueCapacity: " + queueCapacity + 
-          " qconsumed: " + consumed +
+          " qconsumed: " + usedResources +
           " currentCapacity: " + currentCapacity +
           " activeUsers: " + activeUsers +
-          " clusterCapacity: " + clusterResource.getMemory()
+          " clusterCapacity: " + clusterResource
       );
     }
 
-    return Resources.createResource(limit);
+    return limit;
   }
   
-  private synchronized boolean assignToUser(String userName, Resource limit) {
+  private synchronized boolean assignToUser(Resource clusterResource,
+      String userName, Resource limit) {
 
     User user = getUser(userName);
     
     // Note: We aren't considering the current request since there is a fixed
-    // overhead of the AM, but it's a > check, not a >= check, so... 
-    if ((user.getConsumedResources().getMemory()) > limit.getMemory()) {
+    // overhead of the AM, but it's a > check, not a >= check, so...
+    if (Resources.greaterThan(resourceCalculator, clusterResource, 
+            user.getConsumedResources(), limit)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("User " + userName + " in queue " + getQueueName() + 
             " will exceed limit - " +  
@@ -1023,21 +1057,15 @@ public class LeafQueue implements CSQueu
     return true;
   }
 
-  static int divideAndCeil(int a, int b) {
-    if (b == 0) {
-      LOG.info("divideAndCeil called with a=" + a + " b=" + b);
-      return 0;
-    }
-    return (a + (b - 1)) / b;
-  }
-
   boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) {
     int requiredContainers = application.getTotalRequiredResources(priority);
     int reservedContainers = application.getNumReservedContainers(priority);
     int starvation = 0;
     if (reservedContainers > 0) {
       float nodeFactor = 
-          ((float)required.getMemory() / getMaximumAllocation().getMemory());
+          Resources.ratio(
+              resourceCalculator, required, getMaximumAllocation()
+              );
       
       // Use percentage of node required to bias against large containers...
       // Protect against corner case where you need the whole node with
@@ -1052,7 +1080,7 @@ public class LeafQueue implements CSQueu
             " app.#re-reserve=" + application.getReReservations(priority) + 
             " reserved=" + reservedContainers + 
             " nodeFactor=" + nodeFactor + 
-            " minAllocFactor=" + minimumAllocationFactor +
+            " minAllocFactor=" + getMinimumAllocationFactor() +
             " starvation=" + starvation);
       }
     }
@@ -1069,7 +1097,8 @@ public class LeafQueue implements CSQueu
     assigned = 
         assignNodeLocalContainers(clusterResource, node, application, priority,
             reservedContainer); 
-    if (Resources.greaterThan(assigned, Resources.none())) {
+    if (Resources.greaterThan(resourceCalculator, clusterResource, 
+            assigned, Resources.none())) {
       return new CSAssignment(assigned, NodeType.NODE_LOCAL);
     }
 
@@ -1077,7 +1106,8 @@ public class LeafQueue implements CSQueu
     assigned = 
         assignRackLocalContainers(clusterResource, node, application, priority, 
             reservedContainer);
-    if (Resources.greaterThan(assigned, Resources.none())) {
+    if (Resources.greaterThan(resourceCalculator, clusterResource, 
+            assigned, Resources.none())) {
       return new CSAssignment(assigned, NodeType.RACK_LOCAL);
     }
     
@@ -1231,7 +1261,8 @@ public class LeafQueue implements CSQueu
 
     Resource available = node.getAvailableResource();
 
-    assert (available.getMemory() >  0);
+    assert Resources.greaterThan(
+        resourceCalculator, clusterResource, available, Resources.none());
 
     // Create the container if necessary
     Container container = 
@@ -1239,12 +1270,13 @@ public class LeafQueue implements CSQueu
   
     // something went wrong getting/creating the container 
     if (container == null) {
+      LOG.warn("Couldn't get container for allocation!");
       return Resources.none();
     }
 
     // Can we allocate a container on this node?
     int availableContainers = 
-        available.getMemory() / capability.getMemory();         
+        resourceCalculator.computeAvailableContainers(available, capability);
     if (availableContainers > 0) {
       // Allocate...
 
@@ -1267,8 +1299,9 @@ public class LeafQueue implements CSQueu
       // Inform the application
       RMContainer allocatedContainer = 
           application.allocate(type, node, priority, request, container);
+
+      // Does the application need this resource?
       if (allocatedContainer == null) {
-        // Did the application need this resource?
         return Resources.none();
       }
 
@@ -1379,7 +1412,7 @@ public class LeafQueue implements CSQueu
     // Update queue metrics
     Resources.addTo(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
-        this, getParent(), clusterResource, minimumAllocation);
+        resourceCalculator, this, getParent(), clusterResource, minimumAllocation);
     ++numContainers;
 
     // Update user metrics
@@ -1404,7 +1437,8 @@ public class LeafQueue implements CSQueu
     // Update queue metrics
     Resources.subtractFrom(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
-        this, getParent(), clusterResource, minimumAllocation);
+        resourceCalculator, this, getParent(), clusterResource, 
+        minimumAllocation);
     --numContainers;
 
     // Update user metrics
@@ -1423,10 +1457,12 @@ public class LeafQueue implements CSQueu
     // Update queue properties
     maxActiveApplications = 
         CSQueueUtils.computeMaxActiveApplications(
+            resourceCalculator,
             clusterResource, minimumAllocation, 
             maxAMResourcePerQueuePercent, absoluteMaxCapacity);
     maxActiveAppsUsingAbsCap = 
         CSQueueUtils.computeMaxActiveApplications(
+            resourceCalculator,
             clusterResource, minimumAllocation, 
             maxAMResourcePerQueuePercent, absoluteCapacity);
     maxActiveApplicationsPerUser = 
@@ -1435,7 +1471,8 @@ public class LeafQueue implements CSQueu
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
-        this, getParent(), clusterResource, minimumAllocation);
+        resourceCalculator, this, getParent(), clusterResource, 
+        minimumAllocation);
     
     // Update application properties
     for (FiCaSchedulerApp application : activeApplications) {
@@ -1452,7 +1489,7 @@ public class LeafQueue implements CSQueu
   }
 
   static class User {
-    Resource consumed = Resources.createResource(0);
+    Resource consumed = Resources.createResource(0, 0);
     int pendingApplications = 0;
     int activeApplications = 0;
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Jan  9 05:08:23 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -74,8 +75,7 @@ public class ParentQueue implements CSQu
   private final Set<CSQueue> childQueues;
   private final Comparator<CSQueue> queueComparator;
   
-  private Resource usedResources = 
-    Resources.createResource(0);
+  private Resource usedResources = Resources.createResource(0, 0);
   
   private final boolean rootQueue;
   
@@ -96,14 +96,16 @@ public class ParentQueue implements CSQu
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
+  private final ResourceCalculator resourceCalculator;
+  
   public ParentQueue(CapacitySchedulerContext cs, 
-      String queueName, Comparator<CSQueue> comparator, 
-      CSQueue parent, CSQueue old) {
+      String queueName, CSQueue parent, CSQueue old) {
     minimumAllocation = cs.getMinimumResourceCapability();
     
     this.parent = parent;
     this.queueName = queueName;
     this.rootQueue = (parent == null);
+    this.resourceCalculator = cs.getResourceCalculator();
 
     // must be called after parent and queueName is set
     this.metrics = old != null ? old.getMetrics() :
@@ -143,7 +145,7 @@ public class ParentQueue implements CSQu
         capacity, absoluteCapacity, 
         maximumCapacity, absoluteMaxCapacity, state, acls);
     
-    this.queueComparator = comparator;
+    this.queueComparator = cs.getQueueComparator();
     this.childQueues = new TreeSet<CSQueue>(queueComparator);
 
     LOG.info("Initialized parent-queue " + queueName + 
@@ -182,7 +184,7 @@ public class ParentQueue implements CSQu
 
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        resourceCalculator, this, parent, clusterResource, minimumAllocation);
 
     LOG.info(queueName +
         ", capacity=" + capacity +
@@ -357,7 +359,7 @@ public class ParentQueue implements CSQu
         "numChildQueue= " + childQueues.size() + ", " + 
         "capacity=" + capacity + ", " +  
         "absoluteCapacity=" + absoluteCapacity + ", " +
-        "usedResources=" + usedResources.getMemory() + "MB, " + 
+        "usedResources=" + usedResources + 
         "usedCapacity=" + getUsedCapacity() + ", " + 
         "numApps=" + getNumApplications() + ", " + 
         "numContainers=" + getNumContainers();
@@ -540,9 +542,9 @@ public class ParentQueue implements CSQu
   public synchronized CSAssignment assignContainers(
       Resource clusterResource, FiCaSchedulerNode node) {
     CSAssignment assignment = 
-        new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
+        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
-    while (canAssign(node)) {
+    while (canAssign(clusterResource, node)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign containers to child-queue of "
           + getQueueName());
@@ -559,8 +561,9 @@ public class ParentQueue implements CSQu
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
-      if (Resources.greaterThan(assignedToChild.getResource(), 
-              Resources.none())) {
+      if (Resources.greaterThan(
+              resourceCalculator, clusterResource, 
+              assignedToChild.getResource(), Resources.none())) {
         // Track resource utilization for the parent-queue
         allocateResource(clusterResource, assignedToChild.getResource());
         
@@ -603,11 +606,14 @@ public class ParentQueue implements CSQu
 
   private synchronized boolean assignToQueue(Resource clusterResource) {
     // Check how of the cluster's absolute capacity we are currently using...
-    float currentCapacity = 
-      (float)(usedResources.getMemory()) / clusterResource.getMemory();
+    float currentCapacity =
+        Resources.divide(
+            resourceCalculator, clusterResource, 
+            usedResources, clusterResource);
+    
     if (currentCapacity >= absoluteMaxCapacity) {
       LOG.info(getQueueName() + 
-          " used=" + usedResources.getMemory() + 
+          " used=" + usedResources + 
           " current-capacity (" + currentCapacity + ") " +
           " >= max-capacity (" + absoluteMaxCapacity + ")");
       return false;
@@ -616,16 +622,16 @@ public class ParentQueue implements CSQu
 
   }
   
-  private boolean canAssign(FiCaSchedulerNode node) {
+  private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
     return (node.getReservedContainer() == null) && 
-        Resources.greaterThanOrEqual(node.getAvailableResource(), 
-                                     minimumAllocation);
+        Resources.greaterThanOrEqual(resourceCalculator, clusterResource, 
+            node.getAvailableResource(), minimumAllocation);
   }
   
   synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
       FiCaSchedulerNode node) {
     CSAssignment assignment = 
-        new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
+        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
     printChildQueues();
 
@@ -640,11 +646,13 @@ public class ParentQueue implements CSQu
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 
-          assignment.getResource().getMemory() + ", " + assignment.getType());
+          assignment.getResource() + ", " + assignment.getType());
       }
 
       // If we do assign, remove the queue and re-insert in-order to re-sort
-      if (Resources.greaterThan(assignment.getResource(), Resources.none())) {
+      if (Resources.greaterThan(
+              resourceCalculator, cluster, 
+              assignment.getResource(), Resources.none())) {
         // Remove and re-insert to sort
         iter.remove();
         LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() + 
@@ -705,7 +713,7 @@ public class ParentQueue implements CSQu
       Resource resource) {
     Resources.addTo(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        resourceCalculator, this, parent, clusterResource, minimumAllocation);
     ++numContainers;
   }
   
@@ -713,7 +721,7 @@ public class ParentQueue implements CSQu
       Resource resource) {
     Resources.subtractFrom(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        resourceCalculator, this, parent, clusterResource, minimumAllocation);
     --numContainers;
   }
 
@@ -726,7 +734,7 @@ public class ParentQueue implements CSQu
     
     // Update metrics
     CSQueueUtils.updateQueueStatistics(
-        this, parent, clusterResource, minimumAllocation);
+        resourceCalculator, this, parent, clusterResource, minimumAllocation);
   }
   
   @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Wed Jan  9 05:08:23 2013
@@ -65,6 +65,7 @@ public class FiCaSchedulerNode extends S
   public FiCaSchedulerNode(RMNode node) {
     this.rmNode = node;
     this.availableResource.setMemory(node.getTotalCapability().getMemory());
+    this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
   }
 
   public RMNode getRMNode() {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Wed Jan  9 05:08:23 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 public class FSLeafQueue extends FSQueue {
   private static final Log LOG = LogFactory.getLog(

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Wed Jan  9 05:08:23 2013
@@ -28,7 +28,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 public class FSParentQueue extends FSQueue {
   private static final Log LOG = LogFactory.getLog(

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Wed Jan  9 05:08:23 2013
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -637,6 +636,33 @@ public class FairScheduler implements Re
         " cluster capacity: " + clusterCapacity);
   }
 
+  /**
+   * Utility method to normalize a list of resource requests, by ensuring that
+   * the memory for each request is a multiple of minMemory and is not zero.
+   *
+   * @param asks a list of resource requests
+   * @param minMemory the configured minimum memory allocation
+   */
+  static void normalizeRequests(List<ResourceRequest> asks,
+      int minMemory) {
+    for (ResourceRequest ask : asks) {
+      normalizeRequest(ask, minMemory);
+    }
+  }
+
+  /**
+   * Utility method to normalize a resource request, by ensuring that the
+   * requested memory is a multiple of minMemory and is not zero.
+   *
+   * @param ask the resource request
+   * @param minMemory the configured minimum memory allocation
+   */
+  static void normalizeRequest(ResourceRequest ask, int minMemory) {
+    int memory = Math.max(ask.getCapability().getMemory(), minMemory);
+    ask.getCapability().setMemory(
+        minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0)));
+  }
+
   @Override
   public Allocation allocate(ApplicationAttemptId appAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release) {
@@ -650,7 +676,7 @@ public class FairScheduler implements Re
     }
 
     // Sanity check
-    SchedulerUtils.normalizeRequests(ask, minimumAllocation.getMemory());
+    normalizeRequests(ask, minimumAllocation.getMemory());
 
     // Release containers
     for (ContainerId releasedContainerId : release) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Wed Jan  9 05:08:23 2013
@@ -42,7 +42,6 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Resources.java?rev=1430682&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Resources.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Resources.java Wed Jan  9 05:08:23 2013
@@ -0,0 +1,150 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+@Evolving
+public class Resources {
+  
+  // Java doesn't have const :(
+  private static final Resource NONE = new Resource() {
+
+    @Override
+    public int getMemory() {
+      return 0;
+    }
+
+    @Override
+    public void setMemory(int memory) {
+      throw new RuntimeException("NONE cannot be modified!");
+    }
+
+    @Override
+    public int getVirtualCores() {
+      return 0;
+    }
+
+    @Override
+    public void setVirtualCores(int cores) {
+      throw new RuntimeException("NONE cannot be modified!");
+    }
+
+    @Override
+    public int compareTo(Resource o) {
+      int diff = 0 - o.getMemory();
+      if (diff == 0) {
+        diff = 0 - o.getVirtualCores();
+      }
+      return diff;
+    }
+    
+  };
+
+  public static Resource createResource(int memory) {
+    return createResource(memory, (memory > 0) ? 1 : 0);
+  }
+
+  public static Resource createResource(int memory, int cores) {
+    Resource resource = Records.newRecord(Resource.class);
+    resource.setMemory(memory);
+    resource.setVirtualCores(cores);
+    return resource;
+  }
+
+  public static Resource none() {
+    return NONE;
+  }
+
+  public static Resource clone(Resource res) {
+    return createResource(res.getMemory(), res.getVirtualCores());
+  }
+
+  public static Resource addTo(Resource lhs, Resource rhs) {
+    lhs.setMemory(lhs.getMemory() + rhs.getMemory());
+    return lhs;
+  }
+
+  public static Resource add(Resource lhs, Resource rhs) {
+    return addTo(clone(lhs), rhs);
+  }
+
+  public static Resource subtractFrom(Resource lhs, Resource rhs) {
+    lhs.setMemory(lhs.getMemory() - rhs.getMemory());
+    return lhs;
+  }
+
+  public static Resource subtract(Resource lhs, Resource rhs) {
+    return subtractFrom(clone(lhs), rhs);
+  }
+
+  public static Resource negate(Resource resource) {
+    return subtract(NONE, resource);
+  }
+
+  public static Resource multiplyTo(Resource lhs, int by) {
+    lhs.setMemory(lhs.getMemory() * by);
+    return lhs;
+  }
+
+  public static Resource multiply(Resource lhs, int by) {
+    return multiplyTo(clone(lhs), by);
+  }
+  
+  /**
+   * Mutliply a resource by a {@code double}. Note that integral 
+   * resource quantites are subject to rounding during cast.
+   */
+  public static Resource multiply(Resource lhs, double by) {
+    Resource out = clone(lhs);
+    out.setMemory((int) (lhs.getMemory() * by));
+    return out;
+  }
+
+  public static boolean equals(Resource lhs, Resource rhs) {
+    return lhs.getMemory() == rhs.getMemory();
+  }
+
+  public static boolean lessThan(Resource lhs, Resource rhs) {
+    return lhs.getMemory() < rhs.getMemory();
+  }
+
+  public static boolean lessThanOrEqual(Resource lhs, Resource rhs) {
+    return lhs.getMemory() <= rhs.getMemory();
+  }
+
+  public static boolean greaterThan(Resource lhs, Resource rhs) {
+    return lhs.getMemory() > rhs.getMemory();
+  }
+
+  public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
+    return lhs.getMemory() >= rhs.getMemory();
+  }
+  
+  public static Resource min(Resource lhs, Resource rhs) {
+    return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs;
+  }
+
+  public static Resource max(Resource lhs, Resource rhs) {
+    return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs;
+  }}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java Wed Jan  9 05:08:23 2013
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 /**
  * Utility class containing scheduling algorithms used in the fair scheduler.

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Jan  9 05:08:23 2013
@@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -114,6 +116,8 @@ public class FifoScheduler implements Re
 
   private static final String DEFAULT_QUEUE_NAME = "default";
   private QueueMetrics metrics;
+  
+  private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
 
   private final Queue DEFAULT_QUEUE = new Queue() {
     @Override
@@ -226,7 +230,8 @@ public class FifoScheduler implements Re
     }
 
     // Sanity check
-    SchedulerUtils.normalizeRequests(ask, minimumAllocation.getMemory());
+    SchedulerUtils.normalizeRequests(ask, resourceCalculator, 
+        clusterResource, minimumAllocation);
 
     // Release containers
     for (ContainerId releasedContainer : release) {
@@ -371,7 +376,8 @@ public class FifoScheduler implements Re
       application.showRequests();
 
       // Done
-      if (Resources.lessThan(node.getAvailableResource(), minimumAllocation)) {
+      if (Resources.lessThan(resourceCalculator, clusterResource,
+              node.getAvailableResource(), minimumAllocation)) {
         break;
       }
     }
@@ -588,8 +594,8 @@ public class FifoScheduler implements Re
           completedContainer, RMContainerEventType.FINISHED);
     }
 
-    if (Resources.greaterThanOrEqual(node.getAvailableResource(),
-        minimumAllocation)) {
+    if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
+            node.getAvailableResource(),minimumAllocation)) {
       LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
           " available resource = " + node.getAvailableResource());
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Wed Jan  9 05:08:23 2013
@@ -77,14 +77,14 @@ public class NodeManager implements Cont
     new HashMap<ApplicationId, List<Container>>();
   
   public NodeManager(String hostName, int containerManagerPort, int httpPort,
-      String rackName, int memory,
+      String rackName, Resource capability,
       ResourceTrackerService resourceTrackerService, RMContext rmContext)
       throws IOException {
     this.containerManagerAddress = hostName + ":" + containerManagerPort;
     this.nodeHttpAddress = hostName + ":" + httpPort;
     this.rackName = rackName;
     this.resourceTrackerService = resourceTrackerService;
-    this.capability = Resources.createResource(memory);
+    this.capability = capability;
     Resources.addTo(available, capability);
 
     this.nodeId = recordFactory.newRecordInstance(NodeId.class);
@@ -102,8 +102,10 @@ public class NodeManager implements Cont
         this.nodeId));
    
     // Sanity check
-    Assert.assertEquals(memory, 
+    Assert.assertEquals(capability.getMemory(), 
        schedulerNode.getAvailableResource().getMemory());
+    Assert.assertEquals(capability.getVirtualCores(), 
+        schedulerNode.getAvailableResource().getVirtualCores());
   }
   
   public String getHostName() {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java Wed Jan  9 05:08:23 2013
@@ -168,7 +168,7 @@ public class TestApplicationACLs {
 
     ContainerLaunchContext amContainer = recordFactory
         .newRecordInstance(ContainerLaunchContext.class);
-    Resource resource = BuilderUtils.newResource(1024);
+    Resource resource = BuilderUtils.newResource(1024, 1);
     amContainer.setResource(resource);
     amContainer.setApplicationACLs(acls);
     context.setAMContainerSpec(amContainer);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Wed Jan  9 05:08:23 2013
@@ -304,9 +304,9 @@ public class TestClientRMTokens {
 
   private static ResourceScheduler createMockScheduler(Configuration conf) {
     ResourceScheduler mockSched = mock(ResourceScheduler.class);
-    doReturn(BuilderUtils.newResource(512)).when(mockSched)
+    doReturn(BuilderUtils.newResource(512, 0)).when(mockSched)
         .getMinimumResourceCapability();
-    doReturn(BuilderUtils.newResource(5120)).when(mockSched)
+    doReturn(BuilderUtils.newResource(5120, 0)).when(mockSched)
         .getMaximumResourceCapability();
     return mockSched;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Wed Jan  9 05:08:23 2013
@@ -265,13 +265,13 @@ public class TestFifoScheduler {
     // Ask for a 1 GB container for app 1
     List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
     ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
-        BuilderUtils.newResource(GB), 1));
+        BuilderUtils.newResource(GB, 1), 1));
     fs.allocate(appAttemptId1, ask1, emptyId);
 
     // Ask for a 2 GB container for app 2
     List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
     ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
-        BuilderUtils.newResource(2 * GB), 1));
+        BuilderUtils.newResource(2 * GB, 1), 1));
     fs.allocate(appAttemptId2, ask2, emptyId);
     
     // Trigger container assignment

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Wed Jan  9 05:08:23 2013
@@ -55,9 +55,9 @@ public class TestResourceManager {
 
   private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
       registerNode(String hostName, int containerManagerPort, int httpPort,
-          String rackName, int memory) throws IOException {
+          String rackName, Resource capability) throws IOException {
     return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
-        hostName, containerManagerPort, httpPort, rackName, memory,
+        hostName, containerManagerPort, httpPort, rackName, capability,
         resourceManager.getResourceTrackerService(), resourceManager
             .getRMContext());
   }
@@ -71,13 +71,15 @@ public class TestResourceManager {
     // Register node1
     String host1 = "host1";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = 
-      registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, memory);
+      registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
+          Resources.createResource(memory, 1));
     nm1.heartbeat();
     
     // Register node2
     String host2 = "host2";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = 
-      registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, memory/2);
+      registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
+          Resources.createResource(memory/2, 1));
     nm2.heartbeat();
 
     // Submit an application
@@ -89,7 +91,7 @@ public class TestResourceManager {
     
     // Application resource requirements
     final int memory1 = 1024;
-    Resource capability1 = Resources.createResource(memory1);
+    Resource capability1 = Resources.createResource(memory1, 1);
     Priority priority1 = 
       org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1);
     application.addResourceRequestSpec(priority1, capability1);
@@ -98,7 +100,7 @@ public class TestResourceManager {
     application.addTask(t1);
         
     final int memory2 = 2048;
-    Resource capability2 = Resources.createResource(memory2);
+    Resource capability2 = Resources.createResource(memory2, 1);
     Priority priority0 = 
       org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); // higher
     application.addResourceRequestSpec(priority0, capability2);
@@ -161,7 +163,8 @@ public class TestResourceManager {
     String host1 = "host1";
     final int memory = 4 * 1024;
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = 
-      registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, memory);
+      registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
+          Resources.createResource(memory, 1));
     nm1.heartbeat();
     nm1.heartbeat();
     Collection<RMNode> values = resourceManager.getRMContext().getRMNodes().values();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java Wed Jan  9 05:08:23 2013
@@ -65,7 +65,7 @@ public class TestRMContainerImpl {
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
     ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
 
-    Resource resource = BuilderUtils.newResource(512);
+    Resource resource = BuilderUtils.newResource(512, 1);
     Priority priority = BuilderUtils.newPriority(5);
 
     Container container = BuilderUtils.newContainer(containerId, nodeId,
@@ -135,7 +135,7 @@ public class TestRMContainerImpl {
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
     ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
 
-    Resource resource = BuilderUtils.newResource(512);
+    Resource resource = BuilderUtils.newResource(512, 1);
     Priority priority = BuilderUtils.newPriority(5);
 
     Container container = BuilderUtils.newContainer(containerId, nodeId,

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Wed Jan  9 05:08:23 2013
@@ -34,7 +34,6 @@ import org.apache.hadoop.metrics2.Metric
 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -67,7 +66,7 @@ public class TestQueueMetrics {
     MetricsSource userSource = userSource(ms, queueName, user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0);
 
-    metrics.setAvailableResourcesToQueue(Resource.createResource(100*GB));
+    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
     metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Wed Jan  9 05:08:23 2013
@@ -20,38 +20,77 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.junit.Assert.assertEquals;
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.junit.Test;
 
 public class TestSchedulerUtils {
 
   @Test
   public void testNormalizeRequest() {
-    int minMemory = 1024;
+    ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
+    
+    final int minMemory = 1024;
+    Resource minResource = Resources.createResource(minMemory, 0);
+    
     ResourceRequest ask = new ResourceRequestPBImpl();
 
     // case negative memory
-    ask.setCapability(Resource.createResource(-1024));
-    SchedulerUtils.normalizeRequest(ask, minMemory);
+    ask.setCapability(Resources.createResource(-1024));
+    SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
     assertEquals(minMemory, ask.getCapability().getMemory());
 
     // case zero memory
-    ask.setCapability(Resource.createResource(0));
-    SchedulerUtils.normalizeRequest(ask, minMemory);
+    ask.setCapability(Resources.createResource(0));
+    SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
     assertEquals(minMemory, ask.getCapability().getMemory());
 
     // case memory is a multiple of minMemory
-    ask.setCapability(Resource.createResource(2 * minMemory));
-    SchedulerUtils.normalizeRequest(ask, minMemory);
+    ask.setCapability(Resources.createResource(2 * minMemory));
+    SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
     assertEquals(2 * minMemory, ask.getCapability().getMemory());
 
     // case memory is not a multiple of minMemory
-    ask.setCapability(Resource.createResource(minMemory + 10));
-    SchedulerUtils.normalizeRequest(ask, minMemory);
+    ask.setCapability(Resources.createResource(minMemory + 10));
+    SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
     assertEquals(2 * minMemory, ask.getCapability().getMemory());
 
   }
+  
+  @Test
+  public void testNormalizeRequestWithDominantResourceCalculator() {
+    ResourceCalculator resourceCalculator = new DominantResourceCalculator();
+    
+    Resource minResource = Resources.createResource(1024, 1);
+    Resource clusterResource = Resources.createResource(10 * 1024, 10);
+    
+    ResourceRequest ask = new ResourceRequestPBImpl();
 
+    // case negative memory/vcores
+    ask.setCapability(Resources.createResource(-1024, -1));
+    SchedulerUtils.normalizeRequest(
+        ask, resourceCalculator, clusterResource, minResource);
+    assertEquals(minResource, ask.getCapability());
+
+    // case zero memory/vcores
+    ask.setCapability(Resources.createResource(0, 0));
+    SchedulerUtils.normalizeRequest(
+        ask, resourceCalculator, clusterResource, minResource);
+    assertEquals(minResource, ask.getCapability());
+    assertEquals(1, ask.getCapability().getVirtualCores());
+    assertEquals(1024, ask.getCapability().getMemory());
+
+    // case non-zero memory & zero cores
+    ask.setCapability(Resources.createResource(1536, 0));
+    SchedulerUtils.normalizeRequest(
+        ask, resourceCalculator, clusterResource, minResource);
+    assertEquals(Resources.createResource(2048, 1), ask.getCapability());
+    assertEquals(1, ask.getCapability().getVirtualCores());
+    assertEquals(2048, ask.getCapability().getMemory());
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Wed Jan  9 05:08:23 2013
@@ -17,8 +17,17 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,6 +47,8 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -53,6 +64,8 @@ public class TestApplicationLimits {
 
   LeafQueue queue;
   
+  private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
+  
   @Before
   public void setUp() throws IOException {
     CapacitySchedulerConfiguration csConf = 
@@ -64,23 +77,27 @@ public class TestApplicationLimits {
     CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
     when(csContext.getConfiguration()).thenReturn(csConf);
     when(csContext.getConf()).thenReturn(conf);
-    when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
-    when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
-    when(csContext.getClusterResources()).thenReturn(Resources.createResource(10 * 16 * GB));
-    
+    when(csContext.getMinimumResourceCapability()).
+        thenReturn(Resources.createResource(GB, 1));
+    when(csContext.getMaximumResourceCapability()).
+        thenReturn(Resources.createResource(16*GB, 32));
+    when(csContext.getClusterResources()).
+        thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
+    when(csContext.getApplicationComparator()).
+        thenReturn(CapacityScheduler.applicationComparator);
+    when(csContext.getQueueComparator()).
+        thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getResourceCalculator()).
+        thenReturn(resourceCalculator);
+
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
         CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
             queues, queues, 
-            CapacityScheduler.queueComparator, 
-            CapacityScheduler.applicationComparator, 
             TestUtils.spyHook);
 
     
-    queue = spy(
-        new LeafQueue(csContext, A, root, 
-                      CapacityScheduler.applicationComparator, null)
-        );
+    queue = spy(new LeafQueue(csContext, A, root, null));
 
     // Stub out ACL checks
     doReturn(true).
@@ -132,21 +149,23 @@ public class TestApplicationLimits {
     when(csContext.getConfiguration()).thenReturn(csConf);
     when(csContext.getConf()).thenReturn(conf);
     when(csContext.getMinimumResourceCapability()).
-        thenReturn(Resources.createResource(GB));
+        thenReturn(Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).
-        thenReturn(Resources.createResource(16*GB));
+        thenReturn(Resources.createResource(16*GB, 16));
+    when(csContext.getApplicationComparator()).
+        thenReturn(CapacityScheduler.applicationComparator);
+    when(csContext.getQueueComparator()).
+        thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     
     // Say cluster has 100 nodes of 16G each
-    Resource clusterResource = Resources.createResource(100 * 16 * GB);
+    Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
     when(csContext.getClusterResources()).thenReturn(clusterResource);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
         CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
-            queues, queues, 
-            CapacityScheduler.queueComparator, 
-            CapacityScheduler.applicationComparator,
-            TestUtils.spyHook);
+            queues, queues, TestUtils.spyHook);
 
     LeafQueue queue = (LeafQueue)queues.get(A);
     
@@ -205,8 +224,12 @@ public class TestApplicationLimits {
         );
 
     // should return -1 if per queue setting not set
-    assertEquals((int)csConf.UNDEFINED, csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
-    int expectedMaxApps =  (int)(csConf.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * 
+    assertEquals(
+        (int)CapacitySchedulerConfiguration.UNDEFINED, 
+        csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
+    int expectedMaxApps =  
+        (int)
+        (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * 
         queue.getAbsoluteCapacity());
     assertEquals(expectedMaxApps, queue.getMaxApplications());
 
@@ -215,8 +238,11 @@ public class TestApplicationLimits {
     assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser());
 
     // should default to global setting if per queue setting not set
-    assertEquals((long) csConf.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, 
-        (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath()));
+    assertEquals(
+        (long)CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, 
+        (long)csConf.getMaximumApplicationMasterResourcePerQueuePercent(
+            queue.getQueuePath())
+            );
 
     // Change the per-queue max AM resources percentage.
     csConf.setFloat(
@@ -228,10 +254,7 @@ public class TestApplicationLimits {
     queues = new HashMap<String, CSQueue>();
     root = 
         CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
-            queues, queues, 
-            CapacityScheduler.queueComparator, 
-            CapacityScheduler.applicationComparator,
-            TestUtils.spyHook);
+            queues, queues, TestUtils.spyHook);
     clusterResource = Resources.createResource(100 * 16 * GB);
 
     queue = (LeafQueue)queues.get(A);
@@ -257,10 +280,7 @@ public class TestApplicationLimits {
     queues = new HashMap<String, CSQueue>();
     root = 
         CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
-            queues, queues, 
-            CapacityScheduler.queueComparator, 
-            CapacityScheduler.applicationComparator,
-            TestUtils.spyHook);
+            queues, queues, TestUtils.spyHook);
 
     queue = (LeafQueue)queues.get(A);
     assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
@@ -445,6 +465,11 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(GB));
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB));
+    when(csContext.getApplicationComparator()).
+        thenReturn(CapacityScheduler.applicationComparator);
+    when(csContext.getQueueComparator()).
+        thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
     
     // Say cluster has 100 nodes of 16G each
     Resource clusterResource = Resources.createResource(100 * 16 * GB);
@@ -452,10 +477,7 @@ public class TestApplicationLimits {
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CapacityScheduler.parseQueue(csContext, csConf, null, "root", 
-        queues, queues, 
-        CapacityScheduler.queueComparator, 
-        CapacityScheduler.applicationComparator, 
-        TestUtils.spyHook);
+        queues, queues, TestUtils.spyHook);
 
     // Manipulate queue 'a'
     LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
@@ -490,7 +512,7 @@ public class TestApplicationLimits {
 
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0);
-    Resource expectedHeadroom = Resources.createResource(10*16*GB);
+    Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     verify(app_0_0).setHeadroom(eq(expectedHeadroom));
 
     // Submit second application from user_0, check headroom
@@ -528,7 +550,7 @@ public class TestApplicationLimits {
     
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0); // Schedule to compute
-    expectedHeadroom = Resources.createResource(10*16*GB / 2); // changes
+    expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     verify(app_0_0).setHeadroom(eq(expectedHeadroom));
     verify(app_0_1).setHeadroom(eq(expectedHeadroom));
     verify(app_1_0).setHeadroom(eq(expectedHeadroom));
@@ -536,7 +558,7 @@ public class TestApplicationLimits {
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
     queue.assignContainers(clusterResource, node_0); // Schedule to compute
-    expectedHeadroom = Resources.createResource(9*16*GB / 2); // changes
+    expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     verify(app_0_0).setHeadroom(eq(expectedHeadroom));
     verify(app_0_1).setHeadroom(eq(expectedHeadroom));
     verify(app_1_0).setHeadroom(eq(expectedHeadroom));

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Wed Jan  9 05:08:23 2013
@@ -91,10 +91,10 @@ public class TestCapacityScheduler {
   
   private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
       registerNode(String hostName, int containerManagerPort, int httpPort,
-          String rackName, int memory)
+          String rackName, Resource capability)
           throws IOException {
     return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
-        hostName, containerManagerPort, httpPort, rackName, memory,
+        hostName, containerManagerPort, httpPort, rackName, capability,
         resourceManager.getResourceTrackerService(), resourceManager
             .getRMContext());
   }  
@@ -107,13 +107,15 @@ public class TestCapacityScheduler {
     // Register node1
     String host_0 = "host_0";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = 
-      registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, 4 * GB);
+      registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
+          Resources.createResource(4 * GB, 1));
     nm_0.heartbeat();
     
     // Register node2
     String host_1 = "host_1";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = 
-      registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 2 * GB);
+      registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
+          Resources.createResource(2 * GB, 1));
     nm_1.heartbeat();
 
     // ResourceRequest priorities
@@ -129,10 +131,10 @@ public class TestCapacityScheduler {
     application_0.addNodeManager(host_0, 1234, nm_0);
     application_0.addNodeManager(host_1, 1234, nm_1);
 
-    Resource capability_0_0 = Resources.createResource(1 * GB);
+    Resource capability_0_0 = Resources.createResource(1 * GB, 1);
     application_0.addResourceRequestSpec(priority_1, capability_0_0);
     
-    Resource capability_0_1 = Resources.createResource(2 * GB);
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
     application_0.addResourceRequestSpec(priority_0, capability_0_1);
 
     Task task_0_0 = new Task(application_0, priority_1, 
@@ -146,10 +148,10 @@ public class TestCapacityScheduler {
     application_1.addNodeManager(host_0, 1234, nm_0);
     application_1.addNodeManager(host_1, 1234, nm_1);
     
-    Resource capability_1_0 = Resources.createResource(3 * GB);
+    Resource capability_1_0 = Resources.createResource(3 * GB, 1);
     application_1.addResourceRequestSpec(priority_1, capability_1_0);
     
-    Resource capability_1_1 = Resources.createResource(2 * GB);
+    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
     application_1.addResourceRequestSpec(priority_0, capability_1_1);
 
     Task task_1_0 = new Task(application_1, priority_1, 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1430682&r1=1430681&r2=1430682&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Wed Jan  9 05:08:23 2013
@@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -90,6 +92,8 @@ public class TestLeafQueue {
   final static int GB = 1024;
   final static String DEFAULT_RACK = "/default";
 
+  private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
+  
   @Before
   public void setUp() throws Exception {
     CapacityScheduler spyCs = new CapacityScheduler();
@@ -108,17 +112,22 @@ public class TestLeafQueue {
     when(csContext.getConfiguration()).thenReturn(csConf);
     when(csContext.getConf()).thenReturn(conf);
     when(csContext.getMinimumResourceCapability()).
-        thenReturn(Resources.createResource(GB));
+        thenReturn(Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).
-        thenReturn(Resources.createResource(16*GB));
+        thenReturn(Resources.createResource(16*GB, 32));
     when(csContext.getClusterResources()).
-        thenReturn(Resources.createResource(100 * 16 * GB));
+        thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
+    when(csContext.getApplicationComparator()).
+    thenReturn(CapacityScheduler.applicationComparator);
+    when(csContext.getQueueComparator()).
+        thenReturn(CapacityScheduler.queueComparator);
+    when(csContext.getResourceCalculator()).
+        thenReturn(resourceCalculator);
+
     root = 
         CapacityScheduler.parseQueue(csContext, csConf, null, 
             CapacitySchedulerConfiguration.ROOT, 
             queues, queues, 
-            CapacityScheduler.queueComparator, 
-            CapacityScheduler.applicationComparator, 
             TestUtils.spyHook);
 
     cs.reinitialize(csConf, rmContext);
@@ -266,7 +275,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
     
     final int numNodes = 1;
-    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (8*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
 
     // Setup resource-requests
@@ -387,7 +397,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
     
     final int numNodes = 1;
-    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (8*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
 
     // Setup resource-requests
@@ -519,7 +530,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
     
     final int numNodes = 2;
-    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (8*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
  
     // Setup resource-requests
@@ -612,7 +624,7 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
     
     final int numNodes = 2;
-    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
  
     // Setup resource-requests
@@ -728,7 +740,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
     
     final int numNodes = 1;
-    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (8*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
     
     // Setup resource-requests
@@ -889,7 +902,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
     
     final int numNodes = 2;
-    Resource clusterResource = Resources.createResource(numNodes * (4*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (4*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
     
     // Setup resource-requests
@@ -990,7 +1004,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
 
     final int numNodes = 3;
-    Resource clusterResource = Resources.createResource(numNodes * (4*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (4*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
 
     // Setup resource-requests
@@ -1090,11 +1105,13 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
     
     final int numNodes = 3;
-    Resource clusterResource = Resources.createResource(numNodes * (4*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (4*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
     when(csContext.getMaximumResourceCapability()).thenReturn(
-        Resources.createResource(4*GB));
-    when(a.getMaximumAllocation()).thenReturn(Resources.createResource(4*GB));
+        Resources.createResource(4*GB, 16));
+    when(a.getMaximumAllocation()).thenReturn(
+        Resources.createResource(4*GB, 16));
     when(a.getMinimumAllocationFactor()).thenReturn(0.25f); // 1G / 4G 
     
     // Setup resource-requests
@@ -1204,7 +1221,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
 
     final int numNodes = 3;
-    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (8*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
     
     // Setup resource-requests and submit
@@ -1344,7 +1362,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
 
     final int numNodes = 3;
-    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (8*GB), 1);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
     
     // Setup resource-requests and submit
@@ -1473,7 +1492,8 @@ public class TestLeafQueue {
     FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
     
     final int numNodes = 3;
-    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    Resource clusterResource = Resources.createResource(
+        numNodes * (8*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
 
     // Setup resource-requests and submit