You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/03/26 04:25:27 UTC

svn commit: r1460961 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ hadoop-yarn/hadoop-yarn-server/hadoop-y...

Author: tucu
Date: Tue Mar 26 03:25:26 2013
New Revision: 1460961

URL: http://svn.apache.org/r1460961
Log:
YARN-469. Make scheduling mode in FS pluggable. (kkambatl via tucu)

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java
Removed:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Mar 26 03:25:26 2013
@@ -96,6 +96,8 @@ Release 2.0.5-beta - UNRELEASED
     YARN-497. Yarn unmanaged-am launcher jar does not define a main class in
     its manifest (Hitesh Shah via bikas)
 
+    YARN-469. Make scheduling mode in FS pluggable. (kkambatl via tucu)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Tue Mar 26 03:25:26 2013
@@ -92,13 +92,7 @@ public class FSLeafQueue extends FSQueue
   
   @Override
   public void recomputeFairShares() {
-    if (schedulingMode == SchedulingMode.FAIR) {
-      SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
-    } else {
-      for (AppSchedulable sched: appScheds) {
-        sched.setFairShare(Resources.createResource(0));
-      }
-    }
+    schedulingMode.computeShares(getAppSchedulables(), getFairShare());
   }
 
   @Override
@@ -162,17 +156,9 @@ public class FSLeafQueue extends FSQueue
       return Resources.none(); // We should never get here
     }
 
-    // Otherwise, chose app to schedule based on given policy (fair vs fifo).
+    // Otherwise, chose app to schedule based on given policy.
     else {
-      Comparator<Schedulable> comparator;
-      if (schedulingMode == SchedulingMode.FIFO) {
-        comparator = new SchedulingAlgorithms.FifoComparator();
-      } else if (schedulingMode == SchedulingMode.FAIR) {
-        comparator = new SchedulingAlgorithms.FairShareComparator();
-      } else {
-        throw new RuntimeException("Unsupported queue scheduling mode " + 
-            schedulingMode);
-      }
+      Comparator<Schedulable> comparator = schedulingMode.getComparator();
 
       Collections.sort(appScheds, comparator);
       for (AppSchedulable sched: appScheds) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Tue Mar 26 03:25:26 2013
@@ -51,7 +51,7 @@ public class FSParentQueue extends FSQue
 
   @Override
   public void recomputeFairShares() {
-    SchedulingAlgorithms.computeFairShares(childQueues, getFairShare());
+    SchedulingMode.getDefault().computeShares(childQueues, getFairShare());
     for (FSQueue childQueue : childQueues) {
       childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
       childQueue.recomputeFairShares();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Mar 26 03:25:26 2013
@@ -803,7 +803,7 @@ public class FairScheduler implements Re
         // At most one task is scheduled each iteration of this loop
         List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
             queueMgr.getLeafQueues());
-        Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
+        Collections.sort(scheds, SchedulingMode.getDefault().getComparator());
         boolean assignedContainer = false;
         for (FSLeafQueue sched : scheds) {
           Resource assigned = sched.assignContainer(node, false);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Tue Mar 26 03:25:26 2013
@@ -310,7 +310,7 @@ public class QueueManager {
     int queueMaxAppsDefault = Integer.MAX_VALUE;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-    SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+    SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault();
 
     // Remember all queue names so we can display them on web UI, etc.
     List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -373,7 +373,8 @@ public class QueueManager {
         queueMaxAppsDefault = val;}
       else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
         String text = ((Text)element.getFirstChild()).getData().trim();
-        defaultSchedulingMode = parseSchedulingMode(text);
+        SchedulingMode.setDefault(text);
+        defaultSchedulingMode = SchedulingMode.getDefault();
       } else {
         LOG.warn("Bad element in allocations file: " + element.getTagName());
       }
@@ -449,7 +450,7 @@ public class QueueManager {
         minSharePreemptionTimeouts.put(queueName, val);
       } else if ("schedulingMode".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        queueModes.put(queueName, parseSchedulingMode(text));
+        queueModes.put(queueName, SchedulingMode.parse(text));
       } else if ("aclSubmitApps".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -476,19 +477,6 @@ public class QueueManager {
     }
   }
 
-  private SchedulingMode parseSchedulingMode(String text)
-      throws AllocationConfigurationException {
-    text = text.toLowerCase();
-    if (text.equals("fair")) {
-      return SchedulingMode.FAIR;
-    } else if (text.equals("fifo")) {
-      return SchedulingMode.FIFO;
-    } else {
-      throw new AllocationConfigurationException(
-          "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
-    }
-  }
-
   /**
    * Get the minimum resource allocation for the given queue.
    * @return the cap set on this queue, or 0 if not set.
@@ -663,7 +651,7 @@ public class QueueManager {
       minSharePreemptionTimeouts = new HashMap<String, Long>();
       defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
       fairSharePreemptionTimeout = Long.MAX_VALUE;
-      defaultSchedulingMode = SchedulingMode.FAIR;
+      defaultSchedulingMode = SchedulingMode.getDefault();
     }
   }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Tue Mar 26 03:25:26 2013
@@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.res
  */
 @Private
 @Unstable
-abstract class Schedulable {
+public abstract class Schedulable {
   /** Fair share assigned to this Schedulable */
   private Resource fairShare = Resources.createResource(0);
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java Tue Mar 26 03:25:26 2013
@@ -15,17 +15,104 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
 
-/**
- * Internal scheduling modes for queues.
- */
-@Private
+@Public
 @Unstable
-public enum SchedulingMode {
-  FAIR, FIFO
+public abstract class SchedulingMode {
+  private static final ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode> instances =
+      new ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode>();
+
+  private static SchedulingMode DEFAULT_MODE =
+      getInstance(FairSchedulingMode.class);
+  
+  public static SchedulingMode getDefault() {
+    return DEFAULT_MODE;
+  }
+
+  public static void setDefault(String className)
+      throws AllocationConfigurationException {
+    DEFAULT_MODE = parse(className);
+  }
+
+  /**
+   * Returns a {@link SchedulingMode} instance corresponding to the passed clazz
+   */
+  public static SchedulingMode getInstance(Class<? extends SchedulingMode> clazz) {
+    SchedulingMode mode = instances.get(clazz);
+    if (mode == null) {
+      mode = ReflectionUtils.newInstance(clazz, null);
+      instances.put(clazz, mode);
+    }
+    return mode;
+  }
+
+  /**
+   * Returns {@link SchedulingMode} instance corresponding to the
+   * {@link SchedulingMode} passed as a string. The mode can be "fair" for
+   * FairSchedulingMode of "fifo" for FifoSchedulingMode. For custom
+   * {@link SchedulingMode}s in the RM classpath, the mode should be canonical
+   * class name of the {@link SchedulingMode}.
+   * 
+   * @param mode canonical class name or "fair" or "fifo"
+   * @throws AllocationConfigurationException
+   */
+  @SuppressWarnings("unchecked")
+  public static SchedulingMode parse(String mode)
+      throws AllocationConfigurationException {
+    @SuppressWarnings("rawtypes")
+    Class clazz;
+    String text = mode.toLowerCase();
+    if (text.equals("fair")) {
+      clazz = FairSchedulingMode.class;
+    } else if (text.equals("fifo")) {
+      clazz = FifoSchedulingMode.class;
+    } else {
+      try {
+        clazz = Class.forName(mode);
+      } catch (ClassNotFoundException cnfe) {
+        throw new AllocationConfigurationException(mode
+            + " SchedulingMode class not found!");
+      }
+    }
+    if (!SchedulingMode.class.isAssignableFrom(clazz)) {
+      throw new AllocationConfigurationException(mode
+          + " does not extend SchedulingMode");
+    }
+    return getInstance(clazz);
+  }
+
+  /**
+   * @return returns the name of SchedulingMode
+   */
+  public abstract String getName();
+
+  /**
+   * The comparator returned by this method is to be used for sorting the
+   * {@link Schedulable}s in that queue.
+   * 
+   * @return the comparator to sort by
+   */
+  public abstract Comparator<Schedulable> getComparator();
+
+  /**
+   * Computes and updates the shares of {@link Schedulable}s as per the
+   * SchedulingMode, to be used later at schedule time.
+   * 
+   * @param schedulables {@link Schedulable}s whose shares are to be updated
+   * @param totalResources Total {@link Resource}s in the cluster
+   */
+  public abstract void computeShares(
+      Collection<? extends Schedulable> schedulables, Resource totalResources);
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java?rev=1460961&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java Tue Mar 26 03:25:26 2013
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class FairSchedulingMode extends SchedulingMode {
+  @VisibleForTesting
+  public static final String NAME = "FairShare";
+  private FairShareComparator comparator = new FairShareComparator();
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  /**
+   * Compare Schedulables via weighted fair sharing. In addition, Schedulables
+   * below their min share get priority over those whose min share is met.
+   * 
+   * Schedulables below their min share are compared by how far below it they
+   * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
+   * and job B has 50 out of a min share of 100, then job B is scheduled next,
+   * because B is at 50% of its min share and A is at 80% of its min share.
+   * 
+   * Schedulables above their min share are compared by (runningTasks / weight).
+   * If all weights are equal, slots are given to the job with the fewest tasks;
+   * otherwise, jobs with more weight get proportionally more slots.
+   */
+  private static class FairShareComparator implements Comparator<Schedulable>,
+      Serializable {
+    private static final long serialVersionUID = 5564969375856699313L;
+
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      double minShareRatio1, minShareRatio2;
+      double useToWeightRatio1, useToWeightRatio2;
+      Resource minShare1 = Resources.min(s1.getMinShare(), s1.getDemand());
+      Resource minShare2 = Resources.min(s2.getMinShare(), s2.getDemand());
+      boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
+      boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
+      Resource one = Resources.createResource(1);
+      minShareRatio1 = (double) s1.getResourceUsage().getMemory()
+          / Resources.max(minShare1, one).getMemory();
+      minShareRatio2 = (double) s2.getResourceUsage().getMemory()
+          / Resources.max(minShare2, one).getMemory();
+      useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
+      useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
+      int res = 0;
+      if (s1Needy && !s2Needy)
+        res = -1;
+      else if (s2Needy && !s1Needy)
+        res = 1;
+      else if (s1Needy && s2Needy)
+        res = (int) Math.signum(minShareRatio1 - minShareRatio2);
+      else
+        // Neither schedulable is needy
+        res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
+      if (res == 0) {
+        // Apps are tied in fairness ratio. Break the tie by submit time and job
+        // name to get a deterministic ordering, which is useful for unit tests.
+        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+        if (res == 0)
+          res = s1.getName().compareTo(s2.getName());
+      }
+      return res;
+    }
+  }
+
+  @Override
+  public Comparator<Schedulable> getComparator() {
+    return comparator;
+  }
+
+  @Override
+  public void computeShares(Collection<? extends Schedulable> schedulables,
+      Resource totalResources) {
+    computeFairShares(schedulables, totalResources);
+  }
+
+  /**
+   * Number of iterations for the binary search in computeFairShares. This is
+   * equivalent to the number of bits of precision in the output. 25 iterations
+   * gives precision better than 0.1 slots in clusters with one million slots.
+   */
+  private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+
+  /**
+   * Given a set of Schedulables and a number of slots, compute their weighted
+   * fair shares. The min shares and demands of the Schedulables are assumed to
+   * be set beforehand. We compute the fairest possible allocation of shares to
+   * the Schedulables that respects their min shares and demands.
+   * 
+   * To understand what this method does, we must first define what weighted
+   * fair sharing means in the presence of minimum shares and demands. If there
+   * were no minimum shares and every Schedulable had an infinite demand (i.e.
+   * could launch infinitely many tasks), then weighted fair sharing would be
+   * achieved if the ratio of slotsAssigned / weight was equal for each
+   * Schedulable and all slots were assigned. Minimum shares and demands add two
+   * further twists: - Some Schedulables may not have enough tasks to fill all
+   * their share. - Some Schedulables may have a min share higher than their
+   * assigned share.
+   * 
+   * To deal with these possibilities, we define an assignment of slots as being
+   * fair if there exists a ratio R such that: - Schedulables S where S.demand <
+   * R * S.weight are assigned share S.demand - Schedulables S where S.minShare
+   * > R * S.weight are given share S.minShare - All other Schedulables S are
+   * assigned share R * S.weight - The sum of all the shares is totalSlots.
+   * 
+   * We call R the weight-to-slots ratio because it converts a Schedulable's
+   * weight to the number of slots it is assigned.
+   * 
+   * We compute a fair allocation by finding a suitable weight-to-slot ratio R.
+   * To do this, we use binary search. Given a ratio R, we compute the number of
+   * slots that would be used in total with this ratio (the sum of the shares
+   * computed using the conditions above). If this number of slots is less than
+   * totalSlots, then R is too small and more slots could be assigned. If the
+   * number of slots is more than totalSlots, then R is too large.
+   * 
+   * We begin the binary search with a lower bound on R of 0 (which means that
+   * all Schedulables are only given their minShare) and an upper bound computed
+   * to be large enough that too many slots are given (by doubling R until we
+   * either use more than totalSlots slots or we fulfill all jobs' demands). The
+   * helper method slotsUsedWithWeightToSlotRatio computes the total number of
+   * slots used with a given value of R.
+   * 
+   * The running time of this algorithm is linear in the number of Schedulables,
+   * because slotsUsedWithWeightToSlotRatio is linear-time and the number of
+   * iterations of binary search is a constant (dependent on desired precision).
+   */
+  public static void computeFairShares(
+      Collection<? extends Schedulable> schedulables, Resource totalResources) {
+    // Find an upper bound on R that we can use in our binary search. We start
+    // at R = 1 and double it until we have either used totalSlots slots or we
+    // have met all Schedulables' demands (if total demand < totalSlots).
+    Resource totalDemand = Resources.createResource(0);
+    for (Schedulable sched : schedulables) {
+      Resources.addTo(totalDemand, sched.getDemand());
+    }
+    Resource cap = Resources.min(totalDemand, totalResources);
+    double rMax = 1.0;
+    while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables),
+        cap)) {
+      rMax *= 2.0;
+    }
+    // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
+    double left = 0;
+    double right = rMax;
+    for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
+      double mid = (left + right) / 2.0;
+      if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables),
+          cap)) {
+        left = mid;
+      } else {
+        right = mid;
+      }
+    }
+    // Set the fair shares based on the value of R we've converged to
+    for (Schedulable sched : schedulables) {
+      sched.setFairShare(computeShare(sched, right));
+    }
+  }
+
+  /**
+   * Compute the number of slots that would be used given a weight-to-slot ratio
+   * w2sRatio, for use in the computeFairShares algorithm as described in #
+   * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+   */
+  private static Resource resUsedWithWeightToResRatio(double w2sRatio,
+      Collection<? extends Schedulable> schedulables) {
+    Resource slotsTaken = Resources.createResource(0);
+    for (Schedulable sched : schedulables) {
+      Resource share = computeShare(sched, w2sRatio);
+      Resources.addTo(slotsTaken, share);
+    }
+    return slotsTaken;
+  }
+
+  /**
+   * Compute the resources assigned to a Schedulable given a particular
+   * res-to-slot ratio r2sRatio, for use in computeFairShares as described in #
+   * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+   */
+  private static Resource computeShare(Schedulable sched, double r2sRatio) {
+    double share = sched.getWeight() * r2sRatio;
+    share = Math.max(share, sched.getMinShare().getMemory());
+    share = Math.min(share, sched.getDemand().getMemory());
+    return Resources.createResource((int) share);
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java?rev=1460961&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java Tue Mar 26 03:25:26 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class FifoSchedulingMode extends SchedulingMode {
+  @VisibleForTesting
+  public static final String NAME = "FIFO";
+  private FifoComparator comparator = new FifoComparator();
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  /**
+   * Compare Schedulables in order of priority and then submission time, as in
+   * the default FIFO scheduler in Hadoop.
+   */
+  static class FifoComparator implements Comparator<Schedulable>, Serializable {
+    private static final long serialVersionUID = -5905036205491177060L;
+
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      int res = s1.getPriority().compareTo(s2.getPriority());
+      if (res == 0) {
+        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+      }
+      if (res == 0) {
+        // In the rare case where jobs were submitted at the exact same time,
+        // compare them by name (which will be the JobID) to get a deterministic
+        // ordering, so we don't alternately launch tasks from different jobs.
+        res = s1.getName().compareTo(s2.getName());
+      }
+      return res;
+    }
+  }
+
+  @Override
+  public Comparator<Schedulable> getComparator() {
+    return comparator;
+  }
+
+  @Override
+  public void computeShares(Collection<? extends Schedulable> schedulables,
+      Resource totalResources) {
+    for (Schedulable sched : schedulables) {
+      sched.setFairShare(Resources.createResource(0));
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java Tue Mar 26 03:25:26 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -32,10 +33,12 @@ import org.junit.Test;
  */
 public class TestComputeFairShares {
   private List<Schedulable> scheds;
+  private SchedulingMode schedulingMode;
   
   @Before
   public void setUp() throws Exception {
     scheds = new ArrayList<Schedulable>();
+    schedulingMode = new FairSchedulingMode();
   }
   
   /** 
@@ -48,7 +51,8 @@ public class TestComputeFairShares {
     scheds.add(new FakeSchedulable(50));
     scheds.add(new FakeSchedulable(30));
     scheds.add(new FakeSchedulable(20));
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
+    schedulingMode.computeShares(scheds,
+        Resources.createResource(40));
     verifyShares(10, 10, 10, 10);
   }
   
@@ -65,7 +69,8 @@ public class TestComputeFairShares {
     scheds.add(new FakeSchedulable(50));
     scheds.add(new FakeSchedulable(11));
     scheds.add(new FakeSchedulable(3));
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
+    schedulingMode.computeShares(scheds,
+        Resources.createResource(40));
     verifyShares(13, 13, 11, 3);
   }
   
@@ -83,7 +88,8 @@ public class TestComputeFairShares {
     scheds.add(new FakeSchedulable(10, 20));
     scheds.add(new FakeSchedulable(10, 0));
     scheds.add(new FakeSchedulable(3, 2));
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
+    schedulingMode.computeShares(scheds,
+        Resources.createResource(40));
     verifyShares(20, 10, 7, 3);
   }
   
@@ -97,7 +103,8 @@ public class TestComputeFairShares {
     scheds.add(new FakeSchedulable(50,  0, 1.0));
     scheds.add(new FakeSchedulable(30,  0, 1.0));
     scheds.add(new FakeSchedulable(20,  0, 0.5));
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
+    schedulingMode.computeShares(scheds,
+        Resources.createResource(45));
     verifyShares(20, 10, 10, 5);
   }
 
@@ -114,7 +121,8 @@ public class TestComputeFairShares {
     scheds.add(new FakeSchedulable(11, 0, 1.0));
     scheds.add(new FakeSchedulable(30, 0, 1.0));
     scheds.add(new FakeSchedulable(20, 0, 0.5));
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
+    schedulingMode.computeShares(scheds,
+        Resources.createResource(45));
     verifyShares(10, 11, 16, 8);
   }
 
@@ -131,7 +139,8 @@ public class TestComputeFairShares {
     scheds.add(new FakeSchedulable(11, 0, 1.0));
     scheds.add(new FakeSchedulable(30, 5, 1.0));
     scheds.add(new FakeSchedulable(20, 15, 0.5));
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
+    schedulingMode.computeShares(scheds,
+        Resources.createResource(45));
     verifyShares(10, 10, 10, 15);
   }
 
@@ -146,7 +155,9 @@ public class TestComputeFairShares {
     scheds.add(new FakeSchedulable(50 * million));
     scheds.add(new FakeSchedulable(30 * million));
     scheds.add(new FakeSchedulable(20 * million));
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40 * million));
+    schedulingMode
+        .computeShares(scheds,
+        Resources.createResource(40 * million));
     verifyShares(10 * million, 10 * million, 10 * million, 10 * million);
   }
 
@@ -159,7 +170,8 @@ public class TestComputeFairShares {
     scheds.add(new FakeSchedulable(50));
     scheds.add(new FakeSchedulable(30));
     scheds.add(new FakeSchedulable(0));
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(30));
+    schedulingMode.computeShares(scheds,
+        Resources.createResource(30));
     verifyShares(10, 10, 10, 0);
   }
   
@@ -168,7 +180,8 @@ public class TestComputeFairShares {
    */
   @Test
   public void testEmptyList() {
-    SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
+    schedulingMode.computeShares(scheds,
+        Resources.createResource(40));
     verifyShares();
   }
   

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Mar 26 03:25:26 2013
@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -1325,7 +1326,7 @@ public class TestFairScheduler {
     FSSchedulerApp app2 = scheduler.applications.get(attId2);
     
     FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1");
-    queue1.setSchedulingMode(SchedulingMode.FIFO);
+    queue1.setSchedulingMode(new FifoSchedulingMode());
     
     scheduler.update();
 

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java?rev=1460961&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java Tue Mar 26 03:25:26 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
+import org.junit.Test;
+
+public class TestSchedulingMode {
+
+  @Test(timeout = 1000)
+  public void testParseSchedulingMode() throws AllocationConfigurationException {
+
+    // Class name
+    SchedulingMode sm = SchedulingMode
+        .parse(FairSchedulingMode.class.getName());
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FairSchedulingMode.NAME));
+
+    // Canonical name
+    sm = SchedulingMode.parse(FairSchedulingMode.class
+        .getCanonicalName());
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FairSchedulingMode.NAME));
+
+    // Class
+    sm = SchedulingMode.getInstance(FairSchedulingMode.class);
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FairSchedulingMode.NAME));
+
+    // Shortname - fair
+    sm = SchedulingMode.parse("fair");
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FairSchedulingMode.NAME));
+
+    // Shortname - fifo
+    sm = SchedulingMode.parse("fifo");
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FifoSchedulingMode.NAME));
+  }
+}