You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/06/03 19:36:21 UTC

svn commit: r1489072 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ hadoop-yarn/hadoop-yarn-server/ha...

Author: tucu
Date: Mon Jun  3 17:36:20 2013
New Revision: 1489072

URL: http://svn.apache.org/r1489072
Log:
YARN-326. Add multi-resource scheduling to the fair scheduler. (sandyr via tucu)

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
      - copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
      - copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
      - copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
      - copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/
      - copied from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
      - copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Mon Jun  3 17:36:20 2013
@@ -88,6 +88,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-392. Make it possible to specify hard locality constraints in resource
     requests. (sandyr via tucu)
 
+    YARN-326. Add multi-resource scheduling to the fair scheduler. 
+    (sandyr via tucu)
+
   IMPROVEMENTS
 
     YARN-365. Change NM heartbeat handling to not generate a scheduler event

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java Mon Jun  3 17:36:20 2013
@@ -32,4 +32,8 @@ public class AllocationConfigurationExce
   public AllocationConfigurationException(String message) {
     super(message);
   }
+  
+  public AllocationConfigurationException(String message, Throwable t) {
+    super(message, t);
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Mon Jun  3 17:36:20 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -119,7 +120,7 @@ public class AppSchedulable extends Sche
   }
 
   @Override
-  public double getWeight() {
+  public ResourceWeights getWeights() {
     return scheduler.getAppWeight(this);
   }
 
@@ -237,10 +238,7 @@ public class AppSchedulable extends Sche
     }
 
     // Can we allocate a container on this node?
-    int availableContainers =
-        available.getMemory() / capability.getMemory();
-
-    if (availableContainers > 0) {
+    if (Resources.fitsIn(capability, available)) {
       // Inform the application of the new container for this request
       RMContainer allocatedContainer =
           app.allocate(type, node, priority, request, container);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Mon Jun  3 17:36:20 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 
@@ -84,7 +85,7 @@ public abstract class FSQueue extends Sc
       throws AllocationConfigurationException;
 
   @Override
-  public double getWeight() {
+  public ResourceWeights getWeights() {
     return queueMgr.getQueueWeight(getName());
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Mon Jun  3 17:36:20 2013
@@ -180,8 +180,8 @@ public class FSSchedulerNode extends Sch
   @Override
   public String toString() {
     return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +  
-      " available=" + getAvailableResource().getMemory() + 
-      " used=" + getUsedResource().getMemory();
+      " available=" + getAvailableResource() + 
+      " used=" + getUsedResource();
   }
 
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Jun  3 17:36:20 2013
@@ -57,8 +57,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -495,14 +497,14 @@ public class FairScheduler implements Re
   }
 
   // synchronized for sizeBasedWeight
-  public synchronized double getAppWeight(AppSchedulable app) {
+  public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
     if (!app.getRunnable()) {
       // Job won't launch tasks, but don't return 0 to avoid division errors
-      return 1.0;
+      return ResourceWeights.NEUTRAL;
     } else {
       double weight = 1.0;
       if (sizeBasedWeight) {
-        // Set weight based on current demand
+        // Set weight based on current memory demand
         weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
       }
       weight *= app.getPriority().getPriority();
@@ -510,7 +512,7 @@ public class FairScheduler implements Re
         // Run weight through the user-supplied weightAdjuster
         weight = weightAdjuster.adjustWeight(app, weight);
       }
-      return weight;
+      return new ResourceWeights((float)weight);
     }
   }
 
@@ -714,37 +716,6 @@ public class FairScheduler implements Re
         " cluster capacity: " + clusterCapacity);
   }
 
-  /**
-   * Utility method to normalize a list of resource requests, by ensuring that
-   * the memory for each request is a multiple of minMemory and is not zero.
-   *
-   * @param asks a list of resource requests
-   * @param minMemory the configured minimum memory allocation
-   * @param maxMemory the configured maximum memory allocation
-   */
-  static void normalizeRequests(List<ResourceRequest> asks,
-      int minMemory, int maxMemory) {
-    for (ResourceRequest ask : asks) {
-      normalizeRequest(ask, minMemory, maxMemory);
-    }
-  }
-
-  /**
-   * Utility method to normalize a resource request, by ensuring that the
-   * requested memory is a multiple of minMemory and is not zero.
-   *
-   * @param ask the resource request
-   * @param minMemory the configured minimum memory allocation
-   * @param maxMemory the configured maximum memory allocation
-   */
-  static void normalizeRequest(ResourceRequest ask, int minMemory,
-      int maxMemory) {
-    int memory = Math.max(ask.getCapability().getMemory(), minMemory);
-    int normalizedMemory =
-        minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0));
-    ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory));
-  }
-
   @Override
   public Allocation allocate(ApplicationAttemptId appAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release) {
@@ -758,8 +729,8 @@ public class FairScheduler implements Re
     }
 
     // Sanity check
-    normalizeRequests(ask, minimumAllocation.getMemory(),
-        maximumAllocation.getMemory());
+    SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
+        clusterCapacity, minimumAllocation, maximumAllocation);
 
     // Release containers
     for (ContainerId releasedContainerId : release) {
@@ -1015,8 +986,8 @@ public class FairScheduler implements Re
   public synchronized void reinitialize(Configuration conf, RMContext rmContext)
       throws IOException {
     this.conf = new FairSchedulerConfiguration(conf);
-    minimumAllocation = this.conf.getMinimumMemoryAllocation();
-    maximumAllocation = this.conf.getMaximumMemoryAllocation();
+    minimumAllocation = this.conf.getMinimumAllocation();
+    maximumAllocation = this.conf.getMaximumAllocation();
     userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
     nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
     rackLocalityThreshold = this.conf.getLocalityThresholdRack();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Mon Jun  3 17:36:20 2013
@@ -18,12 +18,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 @Private
 @Evolving
@@ -78,18 +82,24 @@ public class FairSchedulerConfiguration 
     addResource(FS_CONFIGURATION_FILE);
   }
 
-  public Resource getMinimumMemoryAllocation() {
+  public Resource getMinimumAllocation() {
     int mem = getInt(
         YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
-    return Resources.createResource(mem);
+    int cpu = getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+    return Resources.createResource(mem, cpu);
   }
 
-  public Resource getMaximumMemoryAllocation() {
+  public Resource getMaximumAllocation() {
     int mem = getInt(
         YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    return Resources.createResource(mem);
+    int cpu = getInt(
+        YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    return Resources.createResource(mem, cpu);
   }
 
   public boolean getUserAsDefaultQueue() {
@@ -136,4 +146,34 @@ public class FairSchedulerConfiguration 
   public int getWaitTimeBeforeKill() {
     return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
   }
+  
+  /**
+   * Parses a resource config value of a form like "1024", "1024 mb",
+   * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.
+   * 
+   * @throws AllocationConfigurationException
+   */
+  public static Resource parseResourceConfigValue(String val)
+      throws AllocationConfigurationException {
+    try {
+      int memory = findResource(val, "mb");
+      int vcores = findResource(val, "vcores");
+      return BuilderUtils.newResource(memory, vcores);
+    } catch (AllocationConfigurationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new AllocationConfigurationException(
+          "Error reading resource config", ex);
+    }
+  }
+  
+  private static int findResource(String val, String units)
+    throws AllocationConfigurationException {
+    Pattern pattern = Pattern.compile("(\\d+) ?" + units);
+    Matcher matcher = pattern.matcher(val);
+    if (!matcher.find()) {
+      throw new AllocationConfigurationException("Missing resource: " + units);
+    }
+    return Integer.parseInt(matcher.group(1));
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Mon Jun  3 17:36:20 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -301,7 +302,7 @@ public class QueueManager {
     Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
     Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
     Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
-    Map<String, Double> queueWeights = new HashMap<String, Double>();
+    Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
     Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
     Map<String, Map<QueueACL, AccessControlList>> queueAcls =
@@ -415,7 +416,7 @@ public class QueueManager {
    */
   private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
-      Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+      Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
       Map<String, SchedulingPolicy> queuePolicies,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) 
@@ -433,12 +434,12 @@ public class QueueManager {
       Element field = (Element) fieldNode;
       if ("minResources".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        int val = Integer.parseInt(text);
-        minQueueResources.put(queueName, Resources.createResource(val));
+        Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+        minQueueResources.put(queueName, val);
       } else if ("maxResources".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        int val = Integer.parseInt(text);
-        maxQueueResources.put(queueName, Resources.createResource(val));
+        Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+        maxQueueResources.put(queueName, val);
       } else if ("maxRunningApps".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
@@ -446,7 +447,7 @@ public class QueueManager {
       } else if ("weight".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         double val = Double.parseDouble(text);
-        queueWeights.put(queueName, val);
+        queueWeights.put(queueName, new ResourceWeights((float)val));
       } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         long val = Long.parseLong(text) * 1000L;
@@ -454,7 +455,9 @@ public class QueueManager {
       } else if ("schedulingPolicy".equals(field.getTagName())
           || "schedulingMode".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        queuePolicies.put(queueName, SchedulingPolicy.parse(text));
+        SchedulingPolicy policy = SchedulingPolicy.parse(text);
+        policy.initialize(scheduler.getClusterCapacity());
+        queuePolicies.put(queueName, policy);
       } else if ("aclSubmitApps".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -510,13 +513,20 @@ public class QueueManager {
   }
 
   /**
-   * Get a collection of all queues
+   * Get a collection of all leaf queues
    */
   public Collection<FSLeafQueue> getLeafQueues() {
     synchronized (queues) {
       return leafQueues;
     }
   }
+  
+  /**
+   * Get a collection of all queues
+   */
+  public Collection<FSQueue> getQueues() {
+    return queues.values();
+  }
 
   public int getUserMaxApps(String user) {
     // save current info in case it gets changed under us
@@ -538,12 +548,12 @@ public class QueueManager {
     }
   }
   
-  public double getQueueWeight(String queue) {
-    Double weight = info.queueWeights.get(queue);
+  public ResourceWeights getQueueWeight(String queue) {
+    ResourceWeights weight = info.queueWeights.get(queue);
     if (weight != null) {
       return weight;
     } else {
-      return 1.0;
+      return ResourceWeights.NEUTRAL;
     }
   }
 
@@ -595,7 +605,7 @@ public class QueueManager {
     // Maximum amount of resources per queue
     public final Map<String, Resource> maxQueueResources;
     // Sharing weights for each queue
-    public final Map<String, Double> queueWeights;
+    public final Map<String, ResourceWeights> queueWeights;
     
     // Max concurrent running applications for each queue and for each user; in addition,
     // for users that have no max specified, we use the userMaxJobsDefault.
@@ -625,7 +635,7 @@ public class QueueManager {
     public QueueManagerInfo(Map<String, Resource> minQueueResources, 
         Map<String, Resource> maxQueueResources, 
         Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
-        Map<String, Double> queueWeights, int userMaxAppsDefault,
+        Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
         int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, 
         Map<String, Long> minSharePreemptionTimeouts, 
         Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@@ -647,7 +657,7 @@ public class QueueManager {
     public QueueManagerInfo() {
       minQueueResources = new HashMap<String, Resource>();
       maxQueueResources = new HashMap<String, Resource>();
-      queueWeights = new HashMap<String, Double>();
+      queueWeights = new HashMap<String, ResourceWeights>();
       queueMaxApps = new HashMap<String, Integer>();
       userMaxApps = new HashMap<String, Integer>();
       userMaxAppsDefault = Integer.MAX_VALUE;

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Mon Jun  3 17:36:20 2013
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 /**
@@ -80,7 +81,7 @@ public abstract class Schedulable {
 
 
   /** Job/queue weight in fair sharing. */
-  public abstract double getWeight();
+  public abstract ResourceWeights getWeights();
 
   /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
   public abstract long getStartTime();
@@ -110,7 +111,7 @@ public abstract class Schedulable {
   /** Convenient toString implementation for debugging. */
   @Override
   public String toString() {
-    return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
-        getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+    return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
+        getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Mon Jun  3 17:36:20 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 
@@ -67,11 +68,12 @@ public abstract class SchedulingPolicy {
   /**
    * Returns {@link SchedulingPolicy} instance corresponding to the
    * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
-   * FairsharePolicy or "fifo" for FifoPolicy. For custom
+   * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for
+   * DominantResourceFairnessPolicy. For a custom
    * {@link SchedulingPolicy}s in the RM classpath, the policy should be
    * canonical class name of the {@link SchedulingPolicy}.
    * 
-   * @param policy canonical class name or "fair" or "fifo"
+   * @param policy canonical class name or "drf" or "fair" or "fifo"
    * @throws AllocationConfigurationException
    */
   @SuppressWarnings("unchecked")
@@ -80,10 +82,12 @@ public abstract class SchedulingPolicy {
     @SuppressWarnings("rawtypes")
     Class clazz;
     String text = policy.toLowerCase();
-    if (text.equals("fair")) {
+    if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
       clazz = FairSharePolicy.class;
-    } else if (text.equals("fifo")) {
+    } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
       clazz = FifoPolicy.class;
+    } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
+      clazz = DominantResourceFairnessPolicy.class;
     } else {
       try {
         clazz = Class.forName(policy);
@@ -98,6 +102,8 @@ public abstract class SchedulingPolicy {
     }
     return getInstance(clazz);
   }
+  
+  public void initialize(Resource clusterCapacity) {}
 
   /**
    * @return returns the name of {@link SchedulingPolicy}

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Mon Jun  3 17:36:20 2013
@@ -25,17 +25,21 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Makes scheduling decisions by trying to equalize shares of memory.
+ */
 @Private
 @Unstable
 public class FairSharePolicy extends SchedulingPolicy {
   @VisibleForTesting
-  public static final String NAME = "Fairshare";
+  public static final String NAME = "fair";
   private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
       new DefaultResourceCalculator();
   private FairShareComparator comparator = new FairShareComparator();
@@ -79,8 +83,10 @@ public class FairSharePolicy extends Sch
           / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
       minShareRatio2 = (double) s2.getResourceUsage().getMemory()
           / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
-      useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
-      useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
+      useToWeightRatio1 = s1.getResourceUsage().getMemory() /
+          s1.getWeights().getWeight(ResourceType.MEMORY);
+      useToWeightRatio2 = s2.getResourceUsage().getMemory() /
+          s2.getWeights().getWeight(ResourceType.MEMORY);
       int res = 0;
       if (s1Needy && !s2Needy)
         res = -1;
@@ -220,7 +226,7 @@ public class FairSharePolicy extends Sch
    * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
    */
   private static Resource computeShare(Schedulable sched, double r2sRatio) {
-    double share = sched.getWeight() * r2sRatio;
+    double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio;
     share = Math.max(share, sched.getMinShare().getMemory());
     share = Math.min(share, sched.getDemand().getMemory());
     return Resources.createResource((int) share);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Mon Jun  3 17:36:20 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -30,7 +31,7 @@ public class FakeSchedulable extends Sch
   private Resource demand;
   private Resource usage;
   private Resource minShare;
-  private double weight;
+  private ResourceWeights weights;
   private Priority priority;
   private long startTime;
   
@@ -46,21 +47,22 @@ public class FakeSchedulable extends Sch
     this(demand, minShare, 1, 0, 0, 0);
   }
   
-  public FakeSchedulable(int demand, int minShare, double weight) {
-    this(demand, minShare, weight, 0, 0, 0);
+  public FakeSchedulable(int demand, int minShare, double memoryWeight) {
+    this(demand, minShare, memoryWeight, 0, 0, 0);
   }
   
   public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage,
       long startTime) {
-    this(Resources.createResource(demand), Resources.createResource(minShare), weight, 
-        Resources.createResource(fairShare), Resources.createResource(usage), startTime);
+    this(Resources.createResource(demand), Resources.createResource(minShare),
+        new ResourceWeights((float)weight), Resources.createResource(fairShare),
+        Resources.createResource(usage), startTime);
   }
   
-  public FakeSchedulable(Resource demand, Resource minShare, double weight, Resource fairShare,
-      Resource usage, long startTime) {
+  public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight,
+      Resource fairShare, Resource usage, long startTime) {
     this.demand = demand;
     this.minShare = minShare;
-    this.weight = weight;
+    this.weights = weight;
     setFairShare(fairShare);
     this.usage = usage;
     this.priority = Records.newRecord(Priority.class);
@@ -98,8 +100,8 @@ public class FakeSchedulable extends Sch
   }
   
   @Override
-  public double getWeight() {
-    return weight;
+  public ResourceWeights getWeights() {
+    return weights;
   }
   
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Mon Jun  3 17:36:20 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
@@ -147,12 +148,17 @@ public class TestFairScheduler {
         ApplicationAttemptId.newInstance(appIdImpl, attemptId);
     return attId;
   }
-
-
+  
   private ResourceRequest createResourceRequest(int memory, String host,
       int priority, int numContainers, boolean relaxLocality) {
+    return createResourceRequest(memory, 1, host, priority, numContainers,
+        relaxLocality);
+  }
+
+  private ResourceRequest createResourceRequest(int memory, int vcores, String host,
+      int priority, int numContainers, boolean relaxLocality) {
     ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
-    request.setCapability(Resources.createResource(memory));
+    request.setCapability(BuilderUtils.newResource(memory, vcores));
     request.setResourceName(host);
     request.setNumContainers(numContainers);
     Priority prio = recordFactory.newRecordInstance(Priority.class);
@@ -170,18 +176,34 @@ public class TestFairScheduler {
       String userId) {
     return createSchedulingRequest(memory, queueId, userId, 1);
   }
+  
+  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+      String queueId, String userId) {
+    return createSchedulingRequest(memory, vcores, queueId, userId, 1);
+  }
 
   private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
       String userId, int numContainers) {
     return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
   }
+  
+  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+      String queueId, String userId, int numContainers) {
+    return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
+  }
 
   private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
       String userId, int numContainers, int priority) {
+    return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
+        priority);
+  }
+  
+  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+      String queueId, String userId, int numContainers, int priority) {
     ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     scheduler.addApplication(id, queueId, userId);
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
+    ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
         priority, numContainers, true);
     ask.add(request);
     scheduler.allocate(id, ask,  new ArrayList<ContainerId>());
@@ -451,10 +473,10 @@ public class TestFairScheduler {
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
     out.println("<queue name=\"queueA\">");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
@@ -569,11 +591,11 @@ public class TestFairScheduler {
     out.println("<allocations>");
     // Give queue A a minimum of 1024 M
     out.println("<queue name=\"queueA\">");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     // Give queue B a minimum of 2048 M
     out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
     out.println("</queue>");
     // Give queue C no minimum
@@ -613,9 +635,9 @@ public class TestFairScheduler {
     assertEquals(Resources.createResource(0),
         queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
 
-    assertEquals(Resources.createResource(1024),
+    assertEquals(Resources.createResource(1024, 0),
         queueManager.getMinResources("root.queueA"));
-    assertEquals(Resources.createResource(2048),
+    assertEquals(Resources.createResource(2048, 0),
         queueManager.getMinResources("root.queueB"));
     assertEquals(Resources.createResource(0),
         queueManager.getMinResources("root.queueC"));
@@ -672,15 +694,15 @@ public class TestFairScheduler {
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
     out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("<queue name=\"queueC\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueD\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("</queue>");
     out.println("</allocations>");
@@ -710,11 +732,11 @@ public class TestFairScheduler {
     out.println("<allocations>");
     // Give queue A a minimum of 1024 M
     out.println("<pool name=\"queueA\">");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</pool>");
     // Give queue B a minimum of 2048 M
     out.println("<pool name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
     out.println("</pool>");
     // Give queue C no minimum
@@ -754,9 +776,9 @@ public class TestFairScheduler {
     assertEquals(Resources.createResource(0),
         queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
 
-    assertEquals(Resources.createResource(1024),
+    assertEquals(Resources.createResource(1024, 0),
         queueManager.getMinResources("root.queueA"));
-    assertEquals(Resources.createResource(2048),
+    assertEquals(Resources.createResource(2048, 0),
         queueManager.getMinResources("root.queueB"));
     assertEquals(Resources.createResource(0),
         queueManager.getMinResources("root.queueC"));
@@ -812,10 +834,10 @@ public class TestFairScheduler {
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
     out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
@@ -825,7 +847,7 @@ public class TestFairScheduler {
 
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
@@ -885,7 +907,7 @@ public class TestFairScheduler {
 
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
@@ -963,19 +985,19 @@ public class TestFairScheduler {
 
     // Create four nodes
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
 
     RMNode node2 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
             "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
 
     RMNode node3 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
             "127.0.0.3");
     NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
     scheduler.handle(nodeEvent3);
@@ -1106,19 +1128,19 @@ public class TestFairScheduler {
     out.println("<allocations>");
     out.println("<queue name=\"queueA\">");
     out.println("<weight>.25</weight>");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueB\">");
     out.println("<weight>.25</weight>");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueC\">");
     out.println("<weight>.25</weight>");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueD\">");
     out.println("<weight>.25</weight>");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
     out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
@@ -1130,19 +1152,19 @@ public class TestFairScheduler {
 
     // Create four nodes
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
 
     RMNode node2 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
             "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
 
     RMNode node3 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
             "127.0.0.3");
     NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
     scheduler.handle(nodeEvent3);
@@ -1206,19 +1228,19 @@ public class TestFairScheduler {
     // After minSharePreemptionTime has passed, they should want to preempt min
     // share.
     clock.tick(6);
-    assertTrue(Resources.equals(
-        Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
-    assertTrue(Resources.equals(
-        Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
+    assertEquals(
+        1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+    assertEquals(
+        1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
 
     // After fairSharePreemptionTime has passed, they should want to preempt
     // fair share.
     scheduler.update();
     clock.tick(6);
-    assertTrue(Resources.equals(
-        Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
-    assertTrue(Resources.equals(
-        Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
+    assertEquals(
+        1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+    assertEquals(
+        1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
   }
   
   @Test (timeout = 5000)
@@ -1271,7 +1293,7 @@ public class TestFairScheduler {
     // Add a node
     RMNode node1 =
         MockNodes
-            .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
     
@@ -1443,7 +1465,7 @@ public class TestFairScheduler {
   public void testFifoWithinQueue() throws Exception {
     RMNode node1 =
         MockNodes
-            .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
+            .newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
     
@@ -1488,7 +1510,7 @@ public class TestFairScheduler {
         .setPolicy(SchedulingPolicy.getDefault());
 
     RMNode node =
-        MockNodes.newNodeInfo(1, Resources.createResource(16384), 0,
+        MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
@@ -1536,10 +1558,10 @@ public class TestFairScheduler {
 
     RMNode node1 =
         MockNodes
-            .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
     RMNode node2 =
         MockNodes
-            .newNodeInfo(1, Resources.createResource(8192), 2, "127.0.0.2");
+            .newNodeInfo(1, Resources.createResource(8192, 8), 2, "127.0.0.2");
 
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
@@ -1685,7 +1707,8 @@ public class TestFairScheduler {
   public void testRemoveNodeUpdatesRootQueueMetrics() {
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
     
-    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1,
+        "127.0.0.1");
     NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(addEvent);
     
@@ -1824,4 +1847,157 @@ public class TestFairScheduler {
     scheduler.handle(nodeUpdateEvent);
     assertEquals(0, app.getReservedContainers().size());
   }
+  
+  public void testNoMoreCpuOnNode() {
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
+        1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+    
+    ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
+        "user1", 2);
+    FSSchedulerApp app = scheduler.applications.get(attId);
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+    assertEquals(1, app.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    assertEquals(1, app.getLiveContainers().size());
+  }
+
+  public void testBasicDRFAssignment() throws Exception {
+    RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
+        "user1", 2);
+    FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+    ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
+        "user1", 2);
+    FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+
+    DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+    drfPolicy.initialize(scheduler.getClusterCapacity());
+    scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+    scheduler.update();
+
+    // First both apps get a container
+    // Then the first gets another container because its dominant share of
+    // 2048/8192 is less than the other's of 2/5
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    Assert.assertEquals(0, app2.getLiveContainers().size());
+
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(2, app1.getLiveContainers().size());
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+  }
+
+  /**
+   * Two apps on one queue, one app on another
+   */
+  @Test
+  public void testBasicDRFWithQueues() throws Exception {
+    RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
+        1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
+        "user1", 2);
+    FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+    ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
+        "user1", 2);
+    FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+    ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
+        "user1", 2);
+    FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
+    
+    DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+    drfPolicy.initialize(scheduler.getClusterCapacity());
+    scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
+    scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app3.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(2, app3.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+  }
+  
+  @Test
+  public void testDRFHierarchicalQueues() throws Exception {
+    RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
+        1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
+        "user1", 2);
+    Thread.sleep(3); // so that start times will be different
+    FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+    ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
+        "user1", 2);
+    Thread.sleep(3); // so that start times will be different
+    FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+    ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
+        "user1", 2);
+    Thread.sleep(3); // so that start times will be different
+    FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
+    ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
+        "user1", 2);
+    Thread.sleep(3); // so that start times will be different
+    FSSchedulerApp app4 = scheduler.applications.get(appAttId4);
+    
+    DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+    drfPolicy.initialize(scheduler.getClusterCapacity());
+    scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
+    scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+    scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(updateEvent);
+    // app1 gets first container because it asked first
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    // app4 gets second container because it's on queue2
+    Assert.assertEquals(1, app4.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    // app4 gets another container because queue2's dominant share of memory
+    // is still less than queue1's of cpu
+    Assert.assertEquals(2, app4.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    // app3 gets one because queue1 gets one and queue1.subqueue2 is behind
+    // queue1.subqueue1
+    Assert.assertEquals(1, app3.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    // app4 would get another one, but it doesn't have any requests
+    // queue1.subqueue2 is still using less than queue1.subqueue1, so it
+    // gets another
+    Assert.assertEquals(2, app3.getLiveContainers().size());
+    // queue1.subqueue1 is behind again, so it gets one, which it gives to app2
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+    
+    // at this point, we've used all our CPU up, so nobody else should get a container
+    scheduler.handle(updateEvent);
+
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+    Assert.assertEquals(2, app3.getLiveContainers().size());
+    Assert.assertEquals(2, app4.getLiveContainers().size());
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java Mon Jun  3 17:36:20 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.junit.Test;
@@ -49,6 +50,11 @@ public class TestSchedulingPolicy {
     assertTrue("Invalid scheduler name",
         sm.getName().equals(FairSharePolicy.NAME));
 
+    // Shortname - drf
+    sm = SchedulingPolicy.parse("drf");
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(DominantResourceFairnessPolicy.NAME));
+    
     // Shortname - fair
     sm = SchedulingPolicy.parse("fair");
     assertTrue("Invalid scheduler name",
@@ -93,7 +99,20 @@ public class TestSchedulingPolicy {
         SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
     assertTrue(ERR,
         SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
-
+    
+    // drf
+    policy = SchedulingPolicy.parse("drf"); 
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
+    assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
+        SchedulingPolicy.DEPTH_INTERMEDIATE));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
+    
     policy = Mockito.mock(SchedulingPolicy.class);
     Mockito.when(policy.getApplicableDepth()).thenReturn(
         SchedulingPolicy.DEPTH_PARENT);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Mon Jun  3 17:36:20 2013
@@ -31,17 +31,18 @@ Hadoop MapReduce Next Generation - Fair 
 * {Introduction}
 
   Fair scheduling is a method of assigning resources to applications such that 
-  all apps get, on average, an equal share of resources over time. 
-  Hadoop NextGen is capable of scheduling multiple resource types, such as 
-  Memory and CPU. Currently only memory is supported, so a "cluster share" is 
-  a proportion of aggregate memory in the cluster. When there is a single app 
-  running, that app uses the entire cluster. When other apps are submitted, 
-  resources that free up are assigned to the new apps, so that each app gets 
-  roughly the same amount of resources. Unlike the default Hadoop scheduler, 
-  which forms a queue of apps, this lets short apps finish in reasonable time
-  while not starving long-lived apps. It is also a reasonable way to share a 
-  cluster between a number of users. Finally, fair sharing can also work with 
-  app priorities - the priorities are used as weights to determine the 
+  all apps get, on average, an equal share of resources over time.
+  Hadoop NextGen is capable of scheduling multiple resource types. By default,
+  the Fair Scheduler bases scheduling fairness decisions only on memory. It
+  can be configured to schedule with both memory and CPU, using the notion
+  of Dominant Resource Fairness developed by Ghodsi et al. When there is a
+  single app running, that app uses the entire cluster. When other apps are
+  submitted, resources that free up are assigned to the new apps, so that each
+  app eventually on gets roughly the same amount of resources. Unlike the default
+  Hadoop scheduler, which forms a queue of apps, this lets short apps finish in
+  reasonable time while not starving long-lived apps. It is also a reasonable way
+  to share a cluster between a number of users. Finally, fair sharing can also
+  work with app priorities - the priorities are used as weights to determine the 
   fraction of total resources that each app should get.
 
   The scheduler organizes apps further into "queues", and shares resources 
@@ -49,9 +50,10 @@ Hadoop MapReduce Next Generation - Fair 
   called “default”. If an app specifically lists a queue in a container 
   resource request, the request is submitted to that queue. It is also 
   possible to assign queues based on the user name included with the request 
-  through configuration. Within each queue, fair sharing is used to share 
-  capacity between the running apps. queues can also be given weights to share 
-  the cluster non-proportionally in the config file.
+  through configuration. Within each queue, a scheduling policy is used to share
+  resources between the running apps. The default is memory-based fair sharing,
+  but FIFO and multi-resource with Dominant Resource Fairness can also be
+  configured. Queues can be configured with weights to share the cluster non-evenly.
 
   The fair scheduler supports hierarchical queues. All queues descend from a
   queue named "root". Available resources are distributed among the children
@@ -120,14 +122,6 @@ Hadoop MapReduce Next Generation - Fair 
      queues and their properties, in addition to certain policy defaults. This file
      must be in XML format as described in the next section.
 
- * <<<yarn.scheduler.fair.minimum-allocation-mb>>>
-
-    * The smallest container size the scheduler can allocate, in MB of memory.
-
- * <<<yarn.scheduler.fair.maximum-allocation-mb>>>
-
-    * The largest container the scheduler can allocate, in MB of memory.
-
  * <<<yarn.scheduler.fair.user-as-default-queue>>>
 
     * Whether to use the username associated with the allocation as the default 
@@ -183,17 +177,23 @@ Allocation file format
  * <<Queue elements>>, which represent queues. Each may contain the following
      properties:
 
-   * minResources: minimum MB of aggregate memory the queue expects. If a queue
-     demands resources, and its current allocation is below its configured minimum,
-     it will be assigned available resources before any queue that is not in this
-     situation.  If multiple queues are in this situation, resources go to the
-     queue with the smallest ratio between allocation and minimum. Note that it is
-     possible that a queue that is below its minimum may not immediately get up to
-     its minimum when it submits an application, because already-running jobs may
-     be using those resources.
-
-   * maxResources: maximum MB of aggregate memory a queue is allowed.  A queue
-     will never be assigned a container that would put it over this limit.
+   * minResources: minimum resources the queue is entitled to, in the form
+     "X mb, Y vcores". If a queue's minimum share is not satisfied, it will be
+     offered available resources before any other queue under the same parent.
+     Under the single-resource fairness policy, a queue
+     is considered unsatisfied if its memory usage is below its minimum memory
+     share. Under dominant resource fairness, a queue is considered unsatisfied
+     if its usage for its dominant resource with respect to the cluster capacity
+     is below its minimum share for that resource. If multiple queues are
+     unsatisfied in this situation, resources go to the queue with the smallest
+     ratio between relevant resource usage and minimum. Note that it is
+     possible that a queue that is below its minimum may not immediately get up
+     to its minimum when it submits an application, because already-running jobs
+     may be using those resources.
+
+   * maxResources: maximum resources a queue is allowed, in the form
+     "X mb, Y vcores". A queue will never be assigned a container that would
+     put its aggregate usage over this limit.
 
    * maxRunningApps: limit the number of apps from the queue to run at once
 
@@ -232,13 +232,13 @@ Allocation file format
 <?xml version="1.0"?>
 <allocations>
   <queue name="sample_queue">
-    <minResources>10000</minResources>
-    <maxResources>90000</maxResources>
+    <minResources>10000 mb</minResources>
+    <maxResources>90000 mb</maxResources>
     <maxRunningApps>50</maxRunningApps>
     <weight>2.0</weight>
     <schedulingMode>fair</schedulingMode>
     <queue name="sample_sub_queue">
-      <minResources>5000</minResources>
+      <minResources>5000 mb</minResources>
     </queue>
   </queue>
   <user name="sample_user">