You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/06/03 19:33:56 UTC

svn commit: r1489070 [1/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ hadoop-yarn/hadoop-yarn-server/hadoop-y...

Author: tucu
Date: Mon Jun  3 17:33:55 2013
New Revision: 1489070

URL: http://svn.apache.org/r1489070
Log:
YARN-326. Add multi-resource scheduling to the fair scheduler. (sandyr via tucu)

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Jun  3 17:33:55 2013
@@ -108,6 +108,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-392. Make it possible to specify hard locality constraints in resource
     requests. (sandyr via tucu)
 
+    YARN-326. Add multi-resource scheduling to the fair scheduler. 
+    (sandyr via tucu)
+
   IMPROVEMENTS
 
     YARN-365. Change NM heartbeat handling to not generate a scheduler event

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java?rev=1489070&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java Mon Jun  3 17:33:55 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resource;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Private
+@Evolving
+public enum ResourceType {
+  MEMORY, CPU
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java?rev=1489070&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java Mon Jun  3 17:33:55 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.resource;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Private
+@Evolving
+public class ResourceWeights {
+  public static final ResourceWeights NEUTRAL = new ResourceWeights(1.0f);
+
+  private float[] weights = new float[ResourceType.values().length];
+
+  public ResourceWeights(float memoryWeight, float cpuWeight) {
+    weights[ResourceType.MEMORY.ordinal()] = memoryWeight;
+    weights[ResourceType.CPU.ordinal()] = cpuWeight;
+  }
+
+  public ResourceWeights(float weight) {
+    for (int i = 0; i < weights.length; i++) {
+      weights[i] = weight;
+    }
+  }
+  
+  public ResourceWeights() { }
+
+  public void setWeight(ResourceType resourceType, float weight) {
+    weights[resourceType.ordinal()] = weight;
+  }
+  
+  public float getWeight(ResourceType resourceType) {
+    return weights[resourceType.ordinal()];
+  }
+  
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<");
+    for (int i = 0; i < ResourceType.values().length; i++) {
+      if (i != 0) {
+        sb.append(", ");
+      }
+      ResourceType resourceType = ResourceType.values()[i];
+      sb.append(resourceType.name().toLowerCase());
+      sb.append(String.format(" weight=%.1f", getWeight(resourceType)));
+    }
+    sb.append(">");
+    return sb.toString();
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java Mon Jun  3 17:33:55 2013
@@ -32,4 +32,8 @@ public class AllocationConfigurationExce
   public AllocationConfigurationException(String message) {
     super(message);
   }
+  
+  public AllocationConfigurationException(String message, Throwable t) {
+    super(message, t);
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Mon Jun  3 17:33:55 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -119,7 +120,7 @@ public class AppSchedulable extends Sche
   }
 
   @Override
-  public double getWeight() {
+  public ResourceWeights getWeights() {
     return scheduler.getAppWeight(this);
   }
 
@@ -237,10 +238,7 @@ public class AppSchedulable extends Sche
     }
 
     // Can we allocate a container on this node?
-    int availableContainers =
-        available.getMemory() / capability.getMemory();
-
-    if (availableContainers > 0) {
+    if (Resources.fitsIn(capability, available)) {
       // Inform the application of the new container for this request
       RMContainer allocatedContainer =
           app.allocate(type, node, priority, request, container);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Mon Jun  3 17:33:55 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 
@@ -84,7 +85,7 @@ public abstract class FSQueue extends Sc
       throws AllocationConfigurationException;
 
   @Override
-  public double getWeight() {
+  public ResourceWeights getWeights() {
     return queueMgr.getQueueWeight(getName());
   }
   

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Mon Jun  3 17:33:55 2013
@@ -180,8 +180,8 @@ public class FSSchedulerNode extends Sch
   @Override
   public String toString() {
     return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +  
-      " available=" + getAvailableResource().getMemory() + 
-      " used=" + getUsedResource().getMemory();
+      " available=" + getAvailableResource() + 
+      " used=" + getUsedResource();
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Jun  3 17:33:55 2013
@@ -57,8 +57,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -495,14 +497,14 @@ public class FairScheduler implements Re
   }
 
   // synchronized for sizeBasedWeight
-  public synchronized double getAppWeight(AppSchedulable app) {
+  public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
     if (!app.getRunnable()) {
       // Job won't launch tasks, but don't return 0 to avoid division errors
-      return 1.0;
+      return ResourceWeights.NEUTRAL;
     } else {
       double weight = 1.0;
       if (sizeBasedWeight) {
-        // Set weight based on current demand
+        // Set weight based on current memory demand
         weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
       }
       weight *= app.getPriority().getPriority();
@@ -510,7 +512,7 @@ public class FairScheduler implements Re
         // Run weight through the user-supplied weightAdjuster
         weight = weightAdjuster.adjustWeight(app, weight);
       }
-      return weight;
+      return new ResourceWeights((float)weight);
     }
   }
 
@@ -714,37 +716,6 @@ public class FairScheduler implements Re
         " cluster capacity: " + clusterCapacity);
   }
 
-  /**
-   * Utility method to normalize a list of resource requests, by ensuring that
-   * the memory for each request is a multiple of minMemory and is not zero.
-   *
-   * @param asks a list of resource requests
-   * @param minMemory the configured minimum memory allocation
-   * @param maxMemory the configured maximum memory allocation
-   */
-  static void normalizeRequests(List<ResourceRequest> asks,
-      int minMemory, int maxMemory) {
-    for (ResourceRequest ask : asks) {
-      normalizeRequest(ask, minMemory, maxMemory);
-    }
-  }
-
-  /**
-   * Utility method to normalize a resource request, by ensuring that the
-   * requested memory is a multiple of minMemory and is not zero.
-   *
-   * @param ask the resource request
-   * @param minMemory the configured minimum memory allocation
-   * @param maxMemory the configured maximum memory allocation
-   */
-  static void normalizeRequest(ResourceRequest ask, int minMemory,
-      int maxMemory) {
-    int memory = Math.max(ask.getCapability().getMemory(), minMemory);
-    int normalizedMemory =
-        minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0));
-    ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory));
-  }
-
   @Override
   public Allocation allocate(ApplicationAttemptId appAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release) {
@@ -758,8 +729,8 @@ public class FairScheduler implements Re
     }
 
     // Sanity check
-    normalizeRequests(ask, minimumAllocation.getMemory(),
-        maximumAllocation.getMemory());
+    SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
+        clusterCapacity, minimumAllocation, maximumAllocation);
 
     // Release containers
     for (ContainerId releasedContainerId : release) {
@@ -1015,8 +986,8 @@ public class FairScheduler implements Re
   public synchronized void reinitialize(Configuration conf, RMContext rmContext)
       throws IOException {
     this.conf = new FairSchedulerConfiguration(conf);
-    minimumAllocation = this.conf.getMinimumMemoryAllocation();
-    maximumAllocation = this.conf.getMaximumMemoryAllocation();
+    minimumAllocation = this.conf.getMinimumAllocation();
+    maximumAllocation = this.conf.getMaximumAllocation();
     userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
     nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
     rackLocalityThreshold = this.conf.getLocalityThresholdRack();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Mon Jun  3 17:33:55 2013
@@ -18,12 +18,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 @Private
 @Evolving
@@ -78,18 +82,24 @@ public class FairSchedulerConfiguration 
     addResource(FS_CONFIGURATION_FILE);
   }
 
-  public Resource getMinimumMemoryAllocation() {
+  public Resource getMinimumAllocation() {
     int mem = getInt(
         YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
-    return Resources.createResource(mem);
+    int cpu = getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+    return Resources.createResource(mem, cpu);
   }
 
-  public Resource getMaximumMemoryAllocation() {
+  public Resource getMaximumAllocation() {
     int mem = getInt(
         YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    return Resources.createResource(mem);
+    int cpu = getInt(
+        YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    return Resources.createResource(mem, cpu);
   }
 
   public boolean getUserAsDefaultQueue() {
@@ -136,4 +146,34 @@ public class FairSchedulerConfiguration 
   public int getWaitTimeBeforeKill() {
     return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
   }
+  
+  /**
+   * Parses a resource config value of a form like "1024", "1024 mb",
+   * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.
+   * 
+   * @throws AllocationConfigurationException
+   */
+  public static Resource parseResourceConfigValue(String val)
+      throws AllocationConfigurationException {
+    try {
+      int memory = findResource(val, "mb");
+      int vcores = findResource(val, "vcores");
+      return BuilderUtils.newResource(memory, vcores);
+    } catch (AllocationConfigurationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new AllocationConfigurationException(
+          "Error reading resource config", ex);
+    }
+  }
+  
+  private static int findResource(String val, String units)
+    throws AllocationConfigurationException {
+    Pattern pattern = Pattern.compile("(\\d+) ?" + units);
+    Matcher matcher = pattern.matcher(val);
+    if (!matcher.find()) {
+      throw new AllocationConfigurationException("Missing resource: " + units);
+    }
+    return Integer.parseInt(matcher.group(1));
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Mon Jun  3 17:33:55 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -301,7 +302,7 @@ public class QueueManager {
     Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
     Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
     Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
-    Map<String, Double> queueWeights = new HashMap<String, Double>();
+    Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
     Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
     Map<String, Map<QueueACL, AccessControlList>> queueAcls =
@@ -415,7 +416,7 @@ public class QueueManager {
    */
   private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
-      Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+      Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
       Map<String, SchedulingPolicy> queuePolicies,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) 
@@ -433,12 +434,12 @@ public class QueueManager {
       Element field = (Element) fieldNode;
       if ("minResources".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        int val = Integer.parseInt(text);
-        minQueueResources.put(queueName, Resources.createResource(val));
+        Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+        minQueueResources.put(queueName, val);
       } else if ("maxResources".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        int val = Integer.parseInt(text);
-        maxQueueResources.put(queueName, Resources.createResource(val));
+        Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+        maxQueueResources.put(queueName, val);
       } else if ("maxRunningApps".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
@@ -446,7 +447,7 @@ public class QueueManager {
       } else if ("weight".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         double val = Double.parseDouble(text);
-        queueWeights.put(queueName, val);
+        queueWeights.put(queueName, new ResourceWeights((float)val));
       } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         long val = Long.parseLong(text) * 1000L;
@@ -454,7 +455,9 @@ public class QueueManager {
       } else if ("schedulingPolicy".equals(field.getTagName())
           || "schedulingMode".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        queuePolicies.put(queueName, SchedulingPolicy.parse(text));
+        SchedulingPolicy policy = SchedulingPolicy.parse(text);
+        policy.initialize(scheduler.getClusterCapacity());
+        queuePolicies.put(queueName, policy);
       } else if ("aclSubmitApps".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -510,13 +513,20 @@ public class QueueManager {
   }
 
   /**
-   * Get a collection of all queues
+   * Get a collection of all leaf queues
    */
   public Collection<FSLeafQueue> getLeafQueues() {
     synchronized (queues) {
       return leafQueues;
     }
   }
+  
+  /**
+   * Get a collection of all queues
+   */
+  public Collection<FSQueue> getQueues() {
+    return queues.values();
+  }
 
   public int getUserMaxApps(String user) {
     // save current info in case it gets changed under us
@@ -538,12 +548,12 @@ public class QueueManager {
     }
   }
   
-  public double getQueueWeight(String queue) {
-    Double weight = info.queueWeights.get(queue);
+  public ResourceWeights getQueueWeight(String queue) {
+    ResourceWeights weight = info.queueWeights.get(queue);
     if (weight != null) {
       return weight;
     } else {
-      return 1.0;
+      return ResourceWeights.NEUTRAL;
     }
   }
 
@@ -595,7 +605,7 @@ public class QueueManager {
     // Maximum amount of resources per queue
     public final Map<String, Resource> maxQueueResources;
     // Sharing weights for each queue
-    public final Map<String, Double> queueWeights;
+    public final Map<String, ResourceWeights> queueWeights;
     
     // Max concurrent running applications for each queue and for each user; in addition,
     // for users that have no max specified, we use the userMaxJobsDefault.
@@ -625,7 +635,7 @@ public class QueueManager {
     public QueueManagerInfo(Map<String, Resource> minQueueResources, 
         Map<String, Resource> maxQueueResources, 
         Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
-        Map<String, Double> queueWeights, int userMaxAppsDefault,
+        Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
         int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, 
         Map<String, Long> minSharePreemptionTimeouts, 
         Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@@ -647,7 +657,7 @@ public class QueueManager {
     public QueueManagerInfo() {
       minQueueResources = new HashMap<String, Resource>();
       maxQueueResources = new HashMap<String, Resource>();
-      queueWeights = new HashMap<String, Double>();
+      queueWeights = new HashMap<String, ResourceWeights>();
       queueMaxApps = new HashMap<String, Integer>();
       userMaxApps = new HashMap<String, Integer>();
       userMaxAppsDefault = Integer.MAX_VALUE;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Mon Jun  3 17:33:55 2013
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 
 /**
@@ -80,7 +81,7 @@ public abstract class Schedulable {
 
 
   /** Job/queue weight in fair sharing. */
-  public abstract double getWeight();
+  public abstract ResourceWeights getWeights();
 
   /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
   public abstract long getStartTime();
@@ -110,7 +111,7 @@ public abstract class Schedulable {
   /** Convenient toString implementation for debugging. */
   @Override
   public String toString() {
-    return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
-        getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+    return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
+        getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
   }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Mon Jun  3 17:33:55 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 
@@ -67,11 +68,12 @@ public abstract class SchedulingPolicy {
   /**
    * Returns {@link SchedulingPolicy} instance corresponding to the
    * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
-   * FairsharePolicy or "fifo" for FifoPolicy. For custom
+   * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for
+   * DominantResourceFairnessPolicy. For a custom
    * {@link SchedulingPolicy}s in the RM classpath, the policy should be
    * canonical class name of the {@link SchedulingPolicy}.
    * 
-   * @param policy canonical class name or "fair" or "fifo"
+   * @param policy canonical class name or "drf" or "fair" or "fifo"
    * @throws AllocationConfigurationException
    */
   @SuppressWarnings("unchecked")
@@ -80,10 +82,12 @@ public abstract class SchedulingPolicy {
     @SuppressWarnings("rawtypes")
     Class clazz;
     String text = policy.toLowerCase();
-    if (text.equals("fair")) {
+    if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
       clazz = FairSharePolicy.class;
-    } else if (text.equals("fifo")) {
+    } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
       clazz = FifoPolicy.class;
+    } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
+      clazz = DominantResourceFairnessPolicy.class;
     } else {
       try {
         clazz = Class.forName(policy);
@@ -98,6 +102,8 @@ public abstract class SchedulingPolicy {
     }
     return getInstance(clazz);
   }
+  
+  public void initialize(Resource clusterCapacity) {}
 
   /**
    * @return returns the name of {@link SchedulingPolicy}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1489070&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Mon Jun  3 17:33:55 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
+
+/**
+ * Makes scheduling decisions by trying to equalize dominant resource usage.
+ * A schedulable's dominant resource usage is the largest ratio of resource
+ * usage to capacity among the resource types it is using.
+ */
+@Private
+@Unstable
+public class DominantResourceFairnessPolicy extends SchedulingPolicy {
+
+  public static final String NAME = "DRF";
+
+  private DominantResourceFairnessComparator comparator =
+      new DominantResourceFairnessComparator();
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public byte getApplicableDepth() {
+    return SchedulingPolicy.DEPTH_ANY;
+  }
+
+  @Override
+  public Comparator<Schedulable> getComparator() {
+    return comparator;
+  }
+  
+  @Override
+  public void computeShares(Collection<? extends Schedulable> schedulables,
+      Resource totalResources) {
+    
+    // TODO: For now, set all fair shares to 0, because, in the context of DRF,
+    // it doesn't make sense to set a value for each resource.  YARN-736 should
+    // add in a sensible replacement.
+    
+    for (Schedulable schedulable : schedulables) {
+      schedulable.setFairShare(Resources.none());
+    }
+  }
+  
+  @Override
+  public void initialize(Resource clusterCapacity) {
+    comparator.setClusterCapacity(clusterCapacity);
+  }
+
+  public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {
+    private static final int NUM_RESOURCES = ResourceType.values().length;
+    
+    private Resource clusterCapacity;
+
+    public void setClusterCapacity(Resource clusterCapacity) {
+      this.clusterCapacity = clusterCapacity;
+    }
+
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      ResourceWeights sharesOfCluster1 = new ResourceWeights();
+      ResourceWeights sharesOfCluster2 = new ResourceWeights();
+      ResourceWeights sharesOfMinShare1 = new ResourceWeights();
+      ResourceWeights sharesOfMinShare2 = new ResourceWeights();
+      ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES];
+      ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES];
+      
+      // Calculate shares of the cluster for each resource both schedulables.
+      calculateShares(s1.getResourceUsage(),
+          clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights());
+      calculateShares(s1.getResourceUsage(),
+          s1.getMinShare(), sharesOfMinShare1, null, ResourceWeights.NEUTRAL);
+      calculateShares(s2.getResourceUsage(),
+          clusterCapacity, sharesOfCluster2, resourceOrder2, s2.getWeights());
+      calculateShares(s2.getResourceUsage(),
+          s2.getMinShare(), sharesOfMinShare2, null, ResourceWeights.NEUTRAL);
+      
+      // A queue is needy for its min share if its dominant resource
+      // (with respect to the cluster capacity) is below its configured min share
+      // for that resource
+      boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f;
+      boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f;
+      
+      int res = 0;
+      if (!s2Needy && !s1Needy) {
+        res = compareShares(sharesOfCluster1, sharesOfCluster2,
+            resourceOrder1, resourceOrder2);
+      } else if (s1Needy && !s2Needy) {
+        res = -1;
+      } else if (s2Needy && !s1Needy) {
+        res = 1;
+      } else { // both are needy below min share
+        res = compareShares(sharesOfMinShare1, sharesOfMinShare2,
+            resourceOrder1, resourceOrder2);
+      }
+      if (res == 0) {
+        // Apps are tied in fairness ratio. Break the tie by submit time.
+        res = (int)(s1.getStartTime() - s2.getStartTime());
+      }
+      return res;
+    }
+    
+    /**
+     * Calculates and orders a resource's share of a pool in terms of two vectors.
+     * The shares vector contains, for each resource, the fraction of the pool that
+     * it takes up.  The resourceOrder vector contains an ordering of resources
+     * by largest share.  So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>,
+     * shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY].
+     */
+    void calculateShares(Resource resource, Resource pool,
+        ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
+      shares.setWeight(MEMORY, (float)resource.getMemory() /
+          (pool.getMemory() * weights.getWeight(MEMORY)));
+      shares.setWeight(CPU, (float)resource.getVirtualCores() /
+          (pool.getVirtualCores() * weights.getWeight(CPU)));
+      // sort order vector by resource share
+      if (resourceOrder != null) {
+        if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) {
+          resourceOrder[0] = MEMORY;
+          resourceOrder[1] = CPU;
+        } else  {
+          resourceOrder[0] = CPU;
+          resourceOrder[1] = MEMORY;
+        }
+      }
+    }
+    
+    private int compareShares(ResourceWeights shares1, ResourceWeights shares2,
+        ResourceType[] resourceOrder1, ResourceType[] resourceOrder2) {
+      for (int i = 0; i < resourceOrder1.length; i++) {
+        int ret = (int)Math.signum(shares1.getWeight(resourceOrder1[i])
+            - shares2.getWeight(resourceOrder2[i]));
+        if (ret != 0) {
+          return ret;
+        }
+      }
+      return 0;
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Mon Jun  3 17:33:55 2013
@@ -25,17 +25,21 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Makes scheduling decisions by trying to equalize shares of memory.
+ */
 @Private
 @Unstable
 public class FairSharePolicy extends SchedulingPolicy {
   @VisibleForTesting
-  public static final String NAME = "Fairshare";
+  public static final String NAME = "fair";
   private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
       new DefaultResourceCalculator();
   private FairShareComparator comparator = new FairShareComparator();
@@ -79,8 +83,10 @@ public class FairSharePolicy extends Sch
           / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
       minShareRatio2 = (double) s2.getResourceUsage().getMemory()
           / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
-      useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
-      useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
+      useToWeightRatio1 = s1.getResourceUsage().getMemory() /
+          s1.getWeights().getWeight(ResourceType.MEMORY);
+      useToWeightRatio2 = s2.getResourceUsage().getMemory() /
+          s2.getWeights().getWeight(ResourceType.MEMORY);
       int res = 0;
       if (s1Needy && !s2Needy)
         res = -1;
@@ -220,7 +226,7 @@ public class FairSharePolicy extends Sch
    * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
    */
   private static Resource computeShare(Schedulable sched, double r2sRatio) {
-    double share = sched.getWeight() * r2sRatio;
+    double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio;
     share = Math.max(share, sched.getMinShare().getMemory());
     share = Math.min(share, sched.getDemand().getMemory());
     return Resources.createResource((int) share);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Mon Jun  3 17:33:55 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -30,7 +31,7 @@ public class FakeSchedulable extends Sch
   private Resource demand;
   private Resource usage;
   private Resource minShare;
-  private double weight;
+  private ResourceWeights weights;
   private Priority priority;
   private long startTime;
   
@@ -46,21 +47,22 @@ public class FakeSchedulable extends Sch
     this(demand, minShare, 1, 0, 0, 0);
   }
   
-  public FakeSchedulable(int demand, int minShare, double weight) {
-    this(demand, minShare, weight, 0, 0, 0);
+  public FakeSchedulable(int demand, int minShare, double memoryWeight) {
+    this(demand, minShare, memoryWeight, 0, 0, 0);
   }
   
   public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage,
       long startTime) {
-    this(Resources.createResource(demand), Resources.createResource(minShare), weight, 
-        Resources.createResource(fairShare), Resources.createResource(usage), startTime);
+    this(Resources.createResource(demand), Resources.createResource(minShare),
+        new ResourceWeights((float)weight), Resources.createResource(fairShare),
+        Resources.createResource(usage), startTime);
   }
   
-  public FakeSchedulable(Resource demand, Resource minShare, double weight, Resource fairShare,
-      Resource usage, long startTime) {
+  public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight,
+      Resource fairShare, Resource usage, long startTime) {
     this.demand = demand;
     this.minShare = minShare;
-    this.weight = weight;
+    this.weights = weight;
     setFairShare(fairShare);
     this.usage = usage;
     this.priority = Records.newRecord(Priority.class);
@@ -98,8 +100,8 @@ public class FakeSchedulable extends Sch
   }
   
   @Override
-  public double getWeight() {
-    return weight;
+  public ResourceWeights getWeights() {
+    return weights;
   }
   
   @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Mon Jun  3 17:33:55 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
@@ -147,12 +148,17 @@ public class TestFairScheduler {
         ApplicationAttemptId.newInstance(appIdImpl, attemptId);
     return attId;
   }
-
-
+  
   private ResourceRequest createResourceRequest(int memory, String host,
       int priority, int numContainers, boolean relaxLocality) {
+    return createResourceRequest(memory, 1, host, priority, numContainers,
+        relaxLocality);
+  }
+
+  private ResourceRequest createResourceRequest(int memory, int vcores, String host,
+      int priority, int numContainers, boolean relaxLocality) {
     ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
-    request.setCapability(Resources.createResource(memory));
+    request.setCapability(BuilderUtils.newResource(memory, vcores));
     request.setResourceName(host);
     request.setNumContainers(numContainers);
     Priority prio = recordFactory.newRecordInstance(Priority.class);
@@ -170,18 +176,34 @@ public class TestFairScheduler {
       String userId) {
     return createSchedulingRequest(memory, queueId, userId, 1);
   }
+  
+  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+      String queueId, String userId) {
+    return createSchedulingRequest(memory, vcores, queueId, userId, 1);
+  }
 
   private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
       String userId, int numContainers) {
     return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
   }
+  
+  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+      String queueId, String userId, int numContainers) {
+    return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
+  }
 
   private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
       String userId, int numContainers, int priority) {
+    return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
+        priority);
+  }
+  
+  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+      String queueId, String userId, int numContainers, int priority) {
     ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     scheduler.addApplication(id, queueId, userId);
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
+    ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
         priority, numContainers, true);
     ask.add(request);
     scheduler.allocate(id, ask,  new ArrayList<ContainerId>());
@@ -451,10 +473,10 @@ public class TestFairScheduler {
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
     out.println("<queue name=\"queueA\">");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
@@ -569,11 +591,11 @@ public class TestFairScheduler {
     out.println("<allocations>");
     // Give queue A a minimum of 1024 M
     out.println("<queue name=\"queueA\">");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     // Give queue B a minimum of 2048 M
     out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
     out.println("</queue>");
     // Give queue C no minimum
@@ -613,9 +635,9 @@ public class TestFairScheduler {
     assertEquals(Resources.createResource(0),
         queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
 
-    assertEquals(Resources.createResource(1024),
+    assertEquals(Resources.createResource(1024, 0),
         queueManager.getMinResources("root.queueA"));
-    assertEquals(Resources.createResource(2048),
+    assertEquals(Resources.createResource(2048, 0),
         queueManager.getMinResources("root.queueB"));
     assertEquals(Resources.createResource(0),
         queueManager.getMinResources("root.queueC"));
@@ -672,15 +694,15 @@ public class TestFairScheduler {
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
     out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("<queue name=\"queueC\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueD\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("</queue>");
     out.println("</allocations>");
@@ -710,11 +732,11 @@ public class TestFairScheduler {
     out.println("<allocations>");
     // Give queue A a minimum of 1024 M
     out.println("<pool name=\"queueA\">");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</pool>");
     // Give queue B a minimum of 2048 M
     out.println("<pool name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
     out.println("</pool>");
     // Give queue C no minimum
@@ -754,9 +776,9 @@ public class TestFairScheduler {
     assertEquals(Resources.createResource(0),
         queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
 
-    assertEquals(Resources.createResource(1024),
+    assertEquals(Resources.createResource(1024, 0),
         queueManager.getMinResources("root.queueA"));
-    assertEquals(Resources.createResource(2048),
+    assertEquals(Resources.createResource(2048, 0),
         queueManager.getMinResources("root.queueB"));
     assertEquals(Resources.createResource(0),
         queueManager.getMinResources("root.queueC"));
@@ -812,10 +834,10 @@ public class TestFairScheduler {
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
     out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048</minResources>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
@@ -825,7 +847,7 @@ public class TestFairScheduler {
 
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
@@ -885,7 +907,7 @@ public class TestFairScheduler {
 
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
@@ -963,19 +985,19 @@ public class TestFairScheduler {
 
     // Create four nodes
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
 
     RMNode node2 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
             "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
 
     RMNode node3 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
             "127.0.0.3");
     NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
     scheduler.handle(nodeEvent3);
@@ -1106,19 +1128,19 @@ public class TestFairScheduler {
     out.println("<allocations>");
     out.println("<queue name=\"queueA\">");
     out.println("<weight>.25</weight>");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueB\">");
     out.println("<weight>.25</weight>");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueC\">");
     out.println("<weight>.25</weight>");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.println("<queue name=\"queueD\">");
     out.println("<weight>.25</weight>");
-    out.println("<minResources>1024</minResources>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
     out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
     out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
@@ -1130,19 +1152,19 @@ public class TestFairScheduler {
 
     // Create four nodes
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
 
     RMNode node2 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
             "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
 
     RMNode node3 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
+        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
             "127.0.0.3");
     NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
     scheduler.handle(nodeEvent3);
@@ -1206,19 +1228,19 @@ public class TestFairScheduler {
     // After minSharePreemptionTime has passed, they should want to preempt min
     // share.
     clock.tick(6);
-    assertTrue(Resources.equals(
-        Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
-    assertTrue(Resources.equals(
-        Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
+    assertEquals(
+        1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+    assertEquals(
+        1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
 
     // After fairSharePreemptionTime has passed, they should want to preempt
     // fair share.
     scheduler.update();
     clock.tick(6);
-    assertTrue(Resources.equals(
-        Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
-    assertTrue(Resources.equals(
-        Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
+    assertEquals(
+        1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+    assertEquals(
+        1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
   }
   
   @Test (timeout = 5000)
@@ -1271,7 +1293,7 @@ public class TestFairScheduler {
     // Add a node
     RMNode node1 =
         MockNodes
-            .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
     
@@ -1443,7 +1465,7 @@ public class TestFairScheduler {
   public void testFifoWithinQueue() throws Exception {
     RMNode node1 =
         MockNodes
-            .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
+            .newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
     
@@ -1488,7 +1510,7 @@ public class TestFairScheduler {
         .setPolicy(SchedulingPolicy.getDefault());
 
     RMNode node =
-        MockNodes.newNodeInfo(1, Resources.createResource(16384), 0,
+        MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
@@ -1536,10 +1558,10 @@ public class TestFairScheduler {
 
     RMNode node1 =
         MockNodes
-            .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
+            .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
     RMNode node2 =
         MockNodes
-            .newNodeInfo(1, Resources.createResource(8192), 2, "127.0.0.2");
+            .newNodeInfo(1, Resources.createResource(8192, 8), 2, "127.0.0.2");
 
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
@@ -1685,7 +1707,8 @@ public class TestFairScheduler {
   public void testRemoveNodeUpdatesRootQueueMetrics() {
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
     
-    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1,
+        "127.0.0.1");
     NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(addEvent);
     
@@ -1824,4 +1847,157 @@ public class TestFairScheduler {
     scheduler.handle(nodeUpdateEvent);
     assertEquals(0, app.getReservedContainers().size());
   }
+  
+  public void testNoMoreCpuOnNode() {
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
+        1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+    
+    ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
+        "user1", 2);
+    FSSchedulerApp app = scheduler.applications.get(attId);
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+    assertEquals(1, app.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    assertEquals(1, app.getLiveContainers().size());
+  }
+
+  public void testBasicDRFAssignment() throws Exception {
+    RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
+        "user1", 2);
+    FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+    ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
+        "user1", 2);
+    FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+
+    DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+    drfPolicy.initialize(scheduler.getClusterCapacity());
+    scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+    scheduler.update();
+
+    // First both apps get a container
+    // Then the first gets another container because its dominant share of
+    // 2048/8192 is less than the other's of 2/5
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    Assert.assertEquals(0, app2.getLiveContainers().size());
+
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(2, app1.getLiveContainers().size());
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+  }
+
+  /**
+   * Two apps on one queue, one app on another
+   */
+  @Test
+  public void testBasicDRFWithQueues() throws Exception {
+    RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
+        1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
+        "user1", 2);
+    FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+    ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
+        "user1", 2);
+    FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+    ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
+        "user1", 2);
+    FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
+    
+    DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+    drfPolicy.initialize(scheduler.getClusterCapacity());
+    scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
+    scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app3.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(2, app3.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+  }
+  
+  @Test
+  public void testDRFHierarchicalQueues() throws Exception {
+    RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
+        1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
+        "user1", 2);
+    Thread.sleep(3); // so that start times will be different
+    FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+    ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
+        "user1", 2);
+    Thread.sleep(3); // so that start times will be different
+    FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+    ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
+        "user1", 2);
+    Thread.sleep(3); // so that start times will be different
+    FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
+    ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
+        "user1", 2);
+    Thread.sleep(3); // so that start times will be different
+    FSSchedulerApp app4 = scheduler.applications.get(appAttId4);
+    
+    DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+    drfPolicy.initialize(scheduler.getClusterCapacity());
+    scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
+    scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+    scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(updateEvent);
+    // app1 gets first container because it asked first
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    // app4 gets second container because it's on queue2
+    Assert.assertEquals(1, app4.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    // app4 gets another container because queue2's dominant share of memory
+    // is still less than queue1's of cpu
+    Assert.assertEquals(2, app4.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    // app3 gets one because queue1 gets one and queue1.subqueue2 is behind
+    // queue1.subqueue1
+    Assert.assertEquals(1, app3.getLiveContainers().size());
+    scheduler.handle(updateEvent);
+    // app4 would get another one, but it doesn't have any requests
+    // queue1.subqueue2 is still using less than queue1.subqueue1, so it
+    // gets another
+    Assert.assertEquals(2, app3.getLiveContainers().size());
+    // queue1.subqueue1 is behind again, so it gets one, which it gives to app2
+    scheduler.handle(updateEvent);
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+    
+    // at this point, we've used all our CPU up, so nobody else should get a container
+    scheduler.handle(updateEvent);
+
+    Assert.assertEquals(1, app1.getLiveContainers().size());
+    Assert.assertEquals(1, app2.getLiveContainers().size());
+    Assert.assertEquals(2, app3.getLiveContainers().size());
+    Assert.assertEquals(2, app4.getLiveContainers().size());
+  }
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java?rev=1489070&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java Mon Jun  3 17:33:55 2013
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestFairSchedulerConfiguration {
+  @Test
+  public void testParseResourceConfigValue() throws Exception {
+    assertEquals(BuilderUtils.newResource(1024, 2),
+        parseResourceConfigValue("2 vcores, 1024 mb"));
+    assertEquals(BuilderUtils.newResource(1024, 2),
+        parseResourceConfigValue("1024 mb, 2 vcores"));
+    assertEquals(BuilderUtils.newResource(1024, 2),
+        parseResourceConfigValue("2vcores,1024mb"));
+    assertEquals(BuilderUtils.newResource(1024, 2),
+        parseResourceConfigValue("1024mb,2vcores"));
+  }
+  
+  @Test(expected = AllocationConfigurationException.class)
+  public void testNoUnits() throws Exception {
+    parseResourceConfigValue("1024");
+  }
+  
+  @Test(expected = AllocationConfigurationException.class)
+  public void testOnlyMemory() throws Exception {
+    parseResourceConfigValue("1024mb");
+  }
+
+  @Test(expected = AllocationConfigurationException.class)
+  public void testOnlyCPU() throws Exception {
+    parseResourceConfigValue("1024vcores");
+  }
+  
+  @Test(expected = AllocationConfigurationException.class)
+  public void testGibberish() throws Exception {
+    parseResourceConfigValue("1o24vc0res");
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java Mon Jun  3 17:33:55 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.junit.Test;
@@ -49,6 +50,11 @@ public class TestSchedulingPolicy {
     assertTrue("Invalid scheduler name",
         sm.getName().equals(FairSharePolicy.NAME));
 
+    // Shortname - drf
+    sm = SchedulingPolicy.parse("drf");
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(DominantResourceFairnessPolicy.NAME));
+    
     // Shortname - fair
     sm = SchedulingPolicy.parse("fair");
     assertTrue("Invalid scheduler name",
@@ -93,7 +99,20 @@ public class TestSchedulingPolicy {
         SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
     assertTrue(ERR,
         SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
-
+    
+    // drf
+    policy = SchedulingPolicy.parse("drf"); 
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
+    assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
+        SchedulingPolicy.DEPTH_INTERMEDIATE));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
+    
     policy = Mockito.mock(SchedulingPolicy.class);
     Mockito.when(policy.getApplicableDepth()).thenReturn(
         SchedulingPolicy.DEPTH_PARENT);