You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2009/08/14 18:32:05 UTC

svn commit: r804284 [2/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/fairscheduler/designdoc/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content...

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Fri Aug 14 16:32:04 2009
@@ -44,10 +44,8 @@
  * Servlet for displaying fair scheduler information, installed at
  * [job tracker URL]/scheduler when the {@link FairScheduler} is in use.
  * 
- * The main features are viewing each job's task count and fair share, ability
- * to change job priorities and pools from the UI, and ability to switch the
- * scheduler to FIFO mode without restarting the JobTracker if this is required
- * for any reason.
+ * The main features are viewing each job's task count and fair share,
+ * and admin controls to change job priorities and pools from the UI.
  * 
  * There is also an "advanced" view for debugging that can be turned on by
  * going to [job tracker URL]/scheduler?advanced.
@@ -82,12 +80,6 @@
     // view page so that the user won't resubmit the data if they hit refresh.
     boolean advancedView = request.getParameter("advanced") != null;
     if (JSPUtil.privateActionsAllowed()
-        && request.getParameter("setFifo") != null) {
-      scheduler.setUseFifo(request.getParameter("setFifo").equals("true"));
-      response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
-      return;
-    }
-    if (JSPUtil.privateActionsAllowed()
         && request.getParameter("setPool") != null) {
       Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
       PoolManager poolMgr = null;
@@ -130,17 +122,14 @@
     String hostname = StringUtils.simpleHostname(
         jobTracker.getJobTrackerMachine());
     out.print("<html><head>");
-    out.printf("<title>%s Job Scheduler Admininstration</title>\n", hostname);
+    out.printf("<title>%s Fair Scheduler Admininstration</title>\n", hostname);
     out.print("<link rel=\"stylesheet\" type=\"text/css\" " + 
         "href=\"/static/hadoop.css\">\n");
     out.print("</head><body>\n");
     out.printf("<h1><a href=\"/jobtracker.jsp\">%s</a> " + 
-        "Job Scheduler Administration</h1>\n", hostname);
+        "Fair Scheduler Administration</h1>\n", hostname);
     showPools(out, advancedView);
     showJobs(out, advancedView);
-    if (JSPUtil.privateActionsAllowed()) {
-      showAdminForm(out, advancedView);
-    }
     out.print("</body></html>\n");
     out.close();
   }
@@ -153,9 +142,13 @@
       PoolManager poolManager = scheduler.getPoolManager();
       out.print("<h2>Pools</h2>\n");
       out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
-      out.print("<tr><th>Pool</th><th>Running Jobs</th>" + 
-          "<th>Min Maps</th><th>Min Reduces</th>" + 
-          "<th>Running Maps</th><th>Running Reduces</th></tr>\n");
+      out.print("<tr><th rowspan=2>Pool</th>" +
+          "<th rowspan=2>Running Jobs</th>" + 
+          "<th colspan=3>Map Tasks</th>" + 
+          "<th colspan=3>Reduce Tasks</th>" +
+          "<th rowspan=2>Scheduling Mode</th></tr>\n<tr>" + 
+          "<th>Min Share</th><th>Running</th><th>Fair Share</th>" + 
+          "<th>Min Share</th><th>Running</th><th>Fair Share</th></tr>\n");
       List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
       Collections.sort(pools, new Comparator<Pool>() {
         public int compare(Pool p1, Pool p2) {
@@ -166,24 +159,21 @@
           else return p1.getName().compareTo(p2.getName());
         }});
       for (Pool pool: pools) {
-        int runningMaps = 0;
-        int runningReduces = 0;
-        for (JobInProgress job: pool.getJobs()) {
-          JobInfo info = scheduler.infos.get(job);
-          if (info != null) {
-            runningMaps += info.runningMaps;
-            runningReduces += info.runningReduces;
-          }
-        }
-        out.print("<tr>\n");
-        out.printf("<td>%s</td>\n", pool.getName());
-        out.printf("<td>%s</td>\n", pool.getJobs().size());
-        out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+        String name = pool.getName();
+        int runningMaps = pool.getMapSchedulable().getRunningTasks();
+        int runningReduces = pool.getReduceSchedulable().getRunningTasks();
+        out.print("<tr>");
+        out.printf("<td>%s</td>", name);
+        out.printf("<td>%d</td>", pool.getJobs().size());
+        out.printf("<td>%d</td>", poolManager.getAllocation(name,
             TaskType.MAP));
-        out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(), 
+        out.printf("<td>%d</td>", runningMaps);
+        out.printf("<td>%.1f</td>", pool.getMapSchedulable().getFairShare());
+        out.printf("<td>%d</td>", poolManager.getAllocation(name,
             TaskType.REDUCE));
-        out.printf("<td>%s</td>\n", runningMaps);
-        out.printf("<td>%s</td>\n", runningReduces);
+        out.printf("<td>%d</td>", runningReduces);
+        out.printf("<td>%.1f</td>", pool.getReduceSchedulable().getFairShare());
+        out.printf("<td>%s</td>", pool.getSchedulingMode());
         out.print("</tr>\n");
       }
       out.print("</table>\n");
@@ -196,21 +186,21 @@
   private void showJobs(PrintWriter out, boolean advancedView) {
     out.print("<h2>Running Jobs</h2>\n");
     out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
-    int colsPerTaskType = advancedView ? 6 : 3;
+    int colsPerTaskType = advancedView ? 4 : 3;
     out.printf("<tr><th rowspan=2>Submitted</th>" + 
         "<th rowspan=2>JobID</th>" +
         "<th rowspan=2>User</th>" +
         "<th rowspan=2>Name</th>" +
         "<th rowspan=2>Pool</th>" +
         "<th rowspan=2>Priority</th>" +
-        "<th colspan=%d>Maps</th>" +
-        "<th colspan=%d>Reduces</th>",
+        "<th colspan=%d>Map Tasks</th>" +
+        "<th colspan=%d>Reduce Tasks</th>",
         colsPerTaskType, colsPerTaskType);
     out.print("</tr><tr>\n");
     out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
-        (advancedView ? "<th>Weight</th><th>Deficit</th><th>minMaps</th>" : ""));
+        (advancedView ? "<th>Weight</th>" : ""));
     out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
-        (advancedView ? "<th>Weight</th><th>Deficit</th><th>minReduces</th>" : ""));
+        (advancedView ? "<th>Weight</th>" : ""));
     out.print("</tr>\n");
     Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
     synchronized (scheduler) {
@@ -218,7 +208,7 @@
         JobProfile profile = job.getProfile();
         JobInfo info = scheduler.infos.get(job);
         if (info == null) { // Job finished, but let's show 0's for info
-          info = new JobInfo(0);
+          info = new JobInfo(null, null);
         }
         out.print("<tr>\n");
         out.printf("<td>%s</td>\n", DATE_FORMAT.format(
@@ -241,23 +231,24 @@
           out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
           out.printf("<td>%s</td>\n", job.getPriority().toString());
         }
-        out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
-            job.finishedMaps(), job.desiredMaps(), info.runningMaps,
-            info.mapFairShare);
+        Pool pool = scheduler.getPoolManager().getPool(job);
+        String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+            String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
+        out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+            job.finishedMaps(), job.desiredMaps(), 
+            info.mapSchedulable.getRunningTasks(),
+            mapShare);
         if (advancedView) {
-          out.printf("<td>%8.1f</td>\n", info.mapWeight);
-          out.printf("<td>%s</td>\n", info.neededMaps > 0 ?
-              (info.mapDeficit / 1000) + "s" : "--");
-          out.printf("<td>%d</td>\n", info.minMaps);
+          out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
         }
-        out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
-            job.finishedReduces(), job.desiredReduces(), info.runningReduces,
-            info.reduceFairShare);
+        String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+            String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
+        out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+            job.finishedReduces(), job.desiredReduces(), 
+            info.reduceSchedulable.getRunningTasks(),
+            reduceShare);
         if (advancedView) {
-          out.printf("<td>%8.1f</td>\n", info.reduceWeight);
-          out.printf("<td>%s</td>\n", info.neededReduces > 0 ?
-              (info.reduceDeficit / 1000) + "s" : "--");
-          out.printf("<td>%d</td>\n", info.minReduces);
+          out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
         }
         out.print("</tr>\n");
       }
@@ -287,24 +278,4 @@
     html.append("</select>\n");
     return html.toString();
   }
-
-  /**
-   * Print the administration form at the bottom of the page, which currently
-   * only includes the button for switching between FIFO and Fair Scheduling.
-   */
-  private void showAdminForm(PrintWriter out, boolean advancedView) {
-    out.print("<h2>Scheduling Mode</h2>\n");
-    String curMode = scheduler.getUseFifo() ? "FIFO" : "Fair Sharing";
-    String otherMode = scheduler.getUseFifo() ? "Fair Sharing" : "FIFO";
-    String advParam = advancedView ? "?advanced" : "";
-    out.printf("<form method=\"post\" action=\"/scheduler%s\">\n", advParam);
-    out.printf("<p>The scheduler is currently using <b>%s mode</b>. " +
-        "<input type=\"submit\" value=\"Switch to %s mode.\" " + 
-        "onclick=\"return confirm('Are you sure you want to change " +
-        "scheduling mode to %s?')\" />\n",
-        curMode, otherMode, otherMode);
-    out.printf("<input type=\"hidden\" name=\"setFifo\" value=\"%s\" />",
-        !scheduler.getUseFifo());
-    out.print("</form>\n");
-  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java Fri Aug 14 16:32:04 2009
@@ -35,7 +35,8 @@
       }
     }
     if (res == 0) {
-      res = j1.hashCode() - j2.hashCode();
+      // If there is a tie, break it by job ID to get a deterministic order
+      res = j1.getJobID().compareTo(j2.getJobID());
     }
     return res;
   }

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class JobSchedulable extends Schedulable {
+  private FairScheduler scheduler;
+  private JobInProgress job;
+  private TaskType taskType;
+  private int demand = 0;
+
+  public JobSchedulable(FairScheduler scheduler, JobInProgress job, 
+      TaskType taskType) {
+    this.scheduler = scheduler;
+    this.job = job;
+    this.taskType = taskType;
+  }
+  
+  @Override
+  public String getName() {
+    return job.getJobID().toString();
+  }
+
+  public JobInProgress getJob() {
+    return job;
+  }
+  
+  @Override
+  public void updateDemand() {
+    demand = 0;
+    if (isRunnable()) {
+      // For reduces, make sure enough maps are done that reduces can launch
+      if (taskType == TaskType.REDUCE && !job.scheduleReduces())
+        return;
+      // Add up demand from each TaskInProgress; each TIP can either
+      // - have no attempts running, in which case it demands 1 slot
+      // - have N attempts running, in which case it demands N slots, and may
+      //   potentially demand one more slot if it needs to be speculated
+      TaskInProgress[] tips = (taskType == TaskType.MAP ? 
+          job.getMapTasks() : job.getReduceTasks());
+      boolean speculationEnabled = (taskType == TaskType.MAP ?
+          job.hasSpeculativeMaps() : job.hasSpeculativeReduces());
+      long time = scheduler.getClock().getTime();
+      for (TaskInProgress tip: tips) {
+        if (!tip.isComplete()) {
+          if (tip.isRunning()) {
+            // Count active tasks and any speculative task we want to launch
+            demand += tip.getActiveTasks().size();
+            if (speculationEnabled && tip.canBeSpeculated(time))
+              demand += 1;
+          } else {
+            // Need to launch 1 task
+            demand += 1;
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isRunnable() {
+    JobInfo info = scheduler.getJobInfo(job);
+    int runState = job.getStatus().getRunState();
+    return (info != null && info.runnable && runState == JobStatus.RUNNING);
+  }
+
+  @Override
+  public int getDemand() {
+    return demand;
+  }
+  
+  @Override
+  public void redistributeShare() {}
+
+  @Override
+  public JobPriority getPriority() {
+    return job.getPriority();
+  }
+
+  @Override
+  public int getRunningTasks() {
+    return taskType == TaskType.MAP ? job.runningMaps() : job.runningReduces();
+  }
+
+  @Override
+  public long getStartTime() {
+    return job.startTime;
+  }
+  
+  @Override
+  public double getWeight() {
+    return scheduler.getJobWeight(job, taskType);
+  }
+  
+  @Override
+  public int getMinShare() {
+    return 0;
+  }
+
+  @Override
+  public Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException {
+    if (isRunnable()) {
+      visited.add(job);
+      TaskTrackerManager ttm = scheduler.taskTrackerManager;
+      ClusterStatus clusterStatus = ttm.getClusterStatus();
+      int numTaskTrackers = clusterStatus.getTaskTrackers();
+      if (taskType == TaskType.MAP) {
+        LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
+            job, currentTime);
+        scheduler.getEventLog().log(
+            "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
+        // obtainNewMapTask needs to be passed 1 + the desired locality level
+        return job.obtainNewMapTask(tts, numTaskTrackers,
+            ttm.getNumberOfUniqueHosts(), localityLevel.toCacheLevelCap());
+      } else {
+        return job.obtainNewReduceTask(tts, numTaskTrackers,
+            ttm.getNumberOfUniqueHosts());
+      }
+    } else {
+      return null;
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Represents the level of data-locality at which a job in the fair scheduler
+ * is allowed to launch tasks. By default, jobs are not allowed to launch
+ * non-data-local tasks until they have waited a small number of seconds to
+ * find a slot on a node that they have data on. If a job has waited this
+ * long, it is allowed to launch rack-local tasks as well (on nodes that may
+ * not have the task's input data, but share a rack with a node that does).
+ * Finally, after a further wait, jobs are allowed to launch tasks anywhere
+ * in the cluster.
+ * 
+ * This enum defines three levels - NODE, RACK and ANY (for allowing tasks
+ * to be launched on any node). A map task's level can be obtained from
+ * its job through {@link #fromTask(JobInProgress, Task, TaskTrackerStatus)}. In
+ * addition, for any locality level, it is possible to get a "level cap" to pass
+ * to {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
+ * to ensure that only tasks at this level or lower are launched, through
+ * the {@link #toCacheLevelCap()} method.
+ */
+public enum LocalityLevel {
+  NODE, RACK, ANY;
+  
+  public static LocalityLevel fromTask(JobInProgress job, Task mapTask,
+      TaskTrackerStatus tracker) {
+    TaskID tipID = mapTask.getTaskID().getTaskID();
+    TaskInProgress tip = job.getTaskInProgress(tipID);
+    switch (job.getLocalityLevel(tip, tracker)) {
+    case 0: return LocalityLevel.NODE;
+    case 1: return LocalityLevel.RACK;
+    default: return LocalityLevel.ANY;
+    }
+  }
+  
+  /**
+   * Obtain a JobInProgress cache level cap to pass to
+   * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
+   * to ensure that only tasks of this locality level and lower are launched.
+   */
+  public int toCacheLevelCap() {
+    switch(this) {
+    case NODE: return 1;
+    case RACK: return 2;
+    default: return Integer.MAX_VALUE;
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java Fri Aug 14 16:32:04 2009
@@ -21,6 +21,8 @@
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.hadoop.mapreduce.TaskType;
+
 /**
  * A schedulable pool of jobs.
  */
@@ -33,9 +35,17 @@
   
   /** Jobs in this specific pool; does not include children pools' jobs. */
   private Collection<JobInProgress> jobs = new ArrayList<JobInProgress>();
+  
+  /** Scheduling mode for jobs inside the pool (fair or FIFO) */
+  private SchedulingMode schedulingMode;
 
-  public Pool(String name) {
+  private PoolSchedulable mapSchedulable;
+  private PoolSchedulable reduceSchedulable;
+
+  public Pool(FairScheduler scheduler, String name) {
     this.name = name;
+    mapSchedulable = new PoolSchedulable(scheduler, this, TaskType.MAP);
+    reduceSchedulable = new PoolSchedulable(scheduler, this, TaskType.REDUCE);
   }
   
   public Collection<JobInProgress> getJobs() {
@@ -44,17 +54,41 @@
   
   public void addJob(JobInProgress job) {
     jobs.add(job);
+    mapSchedulable.addJob(job);
+    reduceSchedulable.addJob(job);
   }
   
   public void removeJob(JobInProgress job) {
     jobs.remove(job);
+    mapSchedulable.removeJob(job);
+    reduceSchedulable.removeJob(job);
   }
   
   public String getName() {
     return name;
   }
 
+  public SchedulingMode getSchedulingMode() {
+    return schedulingMode;
+  }
+  
+  public void setSchedulingMode(SchedulingMode schedulingMode) {
+    this.schedulingMode = schedulingMode;
+  }
+
   public boolean isDefaultPool() {
     return Pool.DEFAULT_POOL_NAME.equals(name);
   }
+  
+  public PoolSchedulable getMapSchedulable() {
+    return mapSchedulable;
+  }
+  
+  public PoolSchedulable getReduceSchedulable() {
+    return reduceSchedulable;
+  }
+  
+  public PoolSchedulable getSchedulable(TaskType type) {
+    return type == TaskType.MAP ? mapSchedulable : reduceSchedulable;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Fri Aug 14 16:32:04 2009
@@ -60,6 +60,8 @@
    * (this is done to prevent loading a file that hasn't been fully written).
    */
   public static final long ALLOC_RELOAD_WAIT = 5 * 1000; 
+
+  private final FairScheduler scheduler;
   
   // Map and reduce minimum allocations for each pool
   private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
@@ -89,6 +91,8 @@
   // below half its fair share for this long, it is allowed to preempt tasks.
   private long fairSharePreemptionTimeout = Long.MAX_VALUE;
   
+  SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+  
   private Object allocFile; // Path to XML file containing allocations. This
                             // is either a URL to specify a classpath resource
                             // (if the fair-scheduler.xml on the classpath is
@@ -103,8 +107,13 @@
   private long lastSuccessfulReload; // Last time we successfully reloaded pools
   private boolean lastReloadAttemptFailed = false;
 
-  public PoolManager(Configuration conf) throws IOException, SAXException,
+  public PoolManager(FairScheduler scheduler) {
+    this.scheduler = scheduler;
+  }
+  
+  public void initialize() throws IOException, SAXException,
       AllocationConfigurationException, ParserConfigurationException {
+    Configuration conf = scheduler.getConf();
     this.poolNameProperty = conf.get(
         "mapred.fairscheduler.poolnameproperty", "user.name");
     this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
@@ -131,11 +140,19 @@
   public synchronized Pool getPool(String name) {
     Pool pool = pools.get(name);
     if (pool == null) {
-      pool = new Pool(name);
+      pool = new Pool(scheduler, name);
+      pool.setSchedulingMode(defaultSchedulingMode);
       pools.put(name, pool);
     }
     return pool;
   }
+  
+  /**
+   * Get the pool that a given job is in.
+   */
+  public Pool getPool(JobInProgress job) {
+    return getPool(getPoolName(job));
+  }
 
   /**
    * Reload allocations file if it hasn't been loaded in a while
@@ -203,11 +220,13 @@
     Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
     Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
     Map<String, Double> poolWeights = new HashMap<String, Double>();
+    Map<String, SchedulingMode> poolModes = new HashMap<String, SchedulingMode>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
     int userMaxJobsDefault = Integer.MAX_VALUE;
     int poolMaxJobsDefault = Integer.MAX_VALUE;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+    SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
     
     // Remember all pool names so we can display them on web UI, etc.
     List<String> poolNamesInAllocFile = new ArrayList<String>();
@@ -262,7 +281,10 @@
             String text = ((Text)field.getFirstChild()).getData().trim();
             long val = Long.parseLong(text) * 1000L;
             minSharePreemptionTimeouts.put(poolName, val);
-          } 
+          } else if ("schedulingMode".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            poolModes.put(poolName, parseSchedulingMode(text));
+          }
         }
       } else if ("user".equals(element.getTagName())) {
         String userName = element.getAttribute("name");
@@ -294,6 +316,9 @@
         String text = ((Text)element.getFirstChild()).getData().trim();
         long val = Long.parseLong(text) * 1000L;
         defaultMinSharePreemptionTimeout = val;
+      } else if ("defaultPoolSchedulingMode".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        defaultSchedulingMode = parseSchedulingMode(text);
       } else {
         LOG.warn("Bad element in allocations file: " + element.getTagName());
       }
@@ -312,12 +337,31 @@
       this.poolMaxJobsDefault = poolMaxJobsDefault;
       this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
       this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+      this.defaultSchedulingMode = defaultSchedulingMode;
       for (String name: poolNamesInAllocFile) {
-        getPool(name);
+        Pool pool = getPool(name);
+        if (poolModes.containsKey(name)) {
+          pool.setSchedulingMode(poolModes.get(name));
+        } else {
+          pool.setSchedulingMode(defaultSchedulingMode);
+        }
       }
     }
   }
 
+  private SchedulingMode parseSchedulingMode(String text)
+      throws AllocationConfigurationException {
+    text = text.toLowerCase();
+    if (text.equals("fair")) {
+      return SchedulingMode.FAIR;
+    } else if (text.equals("fifo")) {
+      return SchedulingMode.FIFO;
+    } else {
+      throw new AllocationConfigurationException(
+          "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
+    }
+  }
+
   /**
    * Get the allocation for a particular pool
    */

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class PoolSchedulable extends Schedulable {
+  public static final Log LOG = LogFactory.getLog(
+      PoolSchedulable.class.getName());
+  
+  private FairScheduler scheduler;
+  private Pool pool;
+  private TaskType taskType;
+  private PoolManager poolMgr;
+  private List<JobSchedulable> jobScheds = new LinkedList<JobSchedulable>();
+  private int demand = 0;
+  
+  // Variables used for preemption
+  long lastTimeAtMinShare;
+  long lastTimeAtHalfFairShare;
+
+  public PoolSchedulable(FairScheduler scheduler, Pool pool, TaskType type) {
+    this.scheduler = scheduler;
+    this.pool = pool;
+    this.taskType = type;
+    this.poolMgr = scheduler.getPoolManager();
+    long currentTime = scheduler.getClock().getTime();
+    this.lastTimeAtMinShare = currentTime;
+    this.lastTimeAtHalfFairShare = currentTime;
+  }
+
+  public void addJob(JobInProgress job) {
+    JobInfo info = scheduler.getJobInfo(job);
+    jobScheds.add(taskType == TaskType.MAP ?
+        info.mapSchedulable : info.reduceSchedulable);
+  }
+  
+  public void removeJob(JobInProgress job) {
+    for (Iterator<JobSchedulable> it = jobScheds.iterator(); it.hasNext();) {
+      JobSchedulable jobSched = it.next();
+      if (jobSched.getJob() == job) {
+        it.remove();
+        break;
+      }
+    }
+  }
+
+  /**
+   * Update demand by asking jobs in the pool to update
+   */
+  @Override
+  public void updateDemand() {
+    demand = 0;
+    for (JobSchedulable sched: jobScheds) {
+      sched.updateDemand();
+      demand += sched.getDemand();
+    }
+  }
+  
+  /**
+   * Distribute the pool's fair share among its jobs
+   */
+  @Override
+  public void redistributeShare() {
+    if (pool.getSchedulingMode() == SchedulingMode.FAIR) {
+      SchedulingAlgorithms.computeFairShares(jobScheds, getFairShare());
+    } else {
+      for (JobSchedulable sched: jobScheds) {
+        sched.setFairShare(0);
+      }
+    } 
+  }
+
+  @Override
+  public int getDemand() {
+    return demand;
+  }
+
+  @Override
+  public int getMinShare() {
+    return poolMgr.getAllocation(pool.getName(), taskType);
+  }
+
+  @Override
+  public double getWeight() {
+    return poolMgr.getPoolWeight(pool.getName());
+  }
+
+  @Override
+  public JobPriority getPriority() {
+    return JobPriority.NORMAL;
+  }
+
+  @Override
+  public int getRunningTasks() {
+    int ans = 0;
+    for (JobSchedulable sched: jobScheds) {
+      ans += sched.getRunningTasks();
+    }
+    return ans;
+  }
+
+  @Override
+  public long getStartTime() {
+    return 0;
+  }
+
+  @Override
+  public Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException {
+    SchedulingMode mode = pool.getSchedulingMode();
+    Comparator<Schedulable> comparator;
+    if (mode == SchedulingMode.FIFO) {
+      comparator = new SchedulingAlgorithms.FifoComparator();
+    } else if (mode == SchedulingMode.FAIR) {
+      comparator = new SchedulingAlgorithms.FairShareComparator();
+    } else {
+      throw new RuntimeException("Unsupported pool scheduling mode " + mode);
+    }
+    Collections.sort(jobScheds, comparator);
+    for (JobSchedulable sched: jobScheds) {
+      Task task = sched.assignTask(tts, currentTime, visited);
+      if (task != null)
+        return task;
+    }
+    return null;
+  }
+  
+  @Override
+  public String getName() {
+    return pool.getName();
+  }
+
+  Pool getPool() {
+    return pool;
+  }
+
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  
+  public Collection<JobSchedulable> getJobSchedulables() {
+    return jobScheds;
+  }
+  
+  public long getLastTimeAtMinShare() {
+    return lastTimeAtMinShare;
+  }
+  
+  public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
+    this.lastTimeAtMinShare = lastTimeAtMinShare;
+  }
+  
+  public long getLastTimeAtHalfFairShare() {
+    return lastTimeAtHalfFairShare;
+  }
+  
+  public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
+    this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A Schedulable represents an entity that can launch tasks, such as a job
+ * or a pool. It provides a common interface so that algorithms such as fair
+ * sharing can be applied both within a pool and across pools. There are 
+ * currently two types of Schedulables: JobSchedulables, which represent a
+ * single job, and PoolSchedulables, which allocate among jobs in their pool.
+ * 
+ * Separate sets of Schedulables are used for maps and reduces. Each pool has
+ * both a mapSchedulable and a reduceSchedulable, and so does each job.
+ * 
+ * A Schedulable is responsible for three roles:
+ * 1) It can launch tasks through assignTask().
+ * 2) It provides information about the job/pool to the scheduler, including:
+ *    - Demand (maximum number of tasks required)
+ *    - Number of currently running tasks
+ *    - Minimum share (for pools)
+ *    - Job/pool weight (for fair sharing)
+ *    - Start time and priority (for FIFO)
+ * 3) It can be assigned a fair share, for use with fair scheduling.
+ * 
+ * Schedulable also contains two methods for performing scheduling computations:
+ * - updateDemand() is called periodically to compute the demand of the various
+ *   jobs and pools, which may be expensive (e.g. jobs must iterate through all
+ *   their tasks to count failed tasks, tasks that can be speculated, etc).
+ * - redistributeShare() is called after demands are updated and a Schedulable's
+ *   fair share has been set by its parent to let it distribute its share among
+ *   the other Schedulables within it (e.g. for pools that want to perform fair
+ *   sharing among their jobs).
+ */
+abstract class Schedulable {
+  /** Fair share assigned to this Schedulable */
+  private double fairShare = 0;
+
+  /**
+   * Name of job/pool, used for debugging as well as for breaking ties in
+   * scheduling order deterministically. 
+   */
+  public abstract String getName();
+  
+  /**
+   * Maximum number of tasks required by this Schedulable. This is defined as
+   * number of currently running tasks + number of unlaunched tasks (tasks that
+   * are either not yet launched or need to be speculated).
+   */
+  public abstract int getDemand();
+  
+  /** Number of tasks the schedulable is currently running. */
+  public abstract int getRunningTasks();
+  
+  /** Minimum share slots assigned to the schedulable. */
+  public abstract int getMinShare();
+  
+  /** Job/pool weight in fair sharing. */
+  public abstract double getWeight();
+  
+  /** Job priority for jobs in FIFO pools; meaningless for PoolSchedulables. */
+  public abstract JobPriority getPriority();
+  
+  /** Start time for jobs in FIFO pools; meaningless for PoolSchedulables. */
+  public abstract long getStartTime();
+  
+  /** Refresh the Schedulable's demand and those of its children if any. */
+  public abstract void updateDemand();
+  
+  /** 
+   * Distribute the fair share assigned to this Schedulable among its 
+   * children (used in pools where the internal scheduler is fair sharing). 
+   */
+  public abstract void redistributeShare();
+  
+  /**
+   * Obtain a task for a given TaskTracker, or null if the Schedulable has
+   * no tasks to launch at this moment or does not wish to launch a task on
+   * this TaskTracker (e.g. is waiting for a TaskTracker with local data). 
+   * In addition, if a job is skipped during this search because it is waiting
+   * for a TaskTracker with local data, this method is expected to add it to
+   * the <tt>visited</tt> collection passed in, so that the scheduler can
+   * properly mark it as skipped during this heartbeat. Please see
+   * {@link FairScheduler#getAllowedLocalityLevel(JobInProgress, long)}
+   * for details of delay scheduling (waiting for trackers with local data).
+   * 
+   * @param tts      TaskTracker that the task will be launched on
+   * @param currentTime Cached time (to prevent excessive calls to gettimeofday)
+   * @param visited  A Collection to which this method must add all jobs that
+   *                 were considered during the search for a job to assign.
+   * @return Task to launch, or null if Schedulable cannot currently launch one.
+   * @throws IOException Possible if obtainNew(Map|Reduce)Task throws exception.
+   */
+  public abstract Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException;
+
+  /** Assign a fair share to this Schedulable. */
+  public void setFairShare(double fairShare) {
+    this.fairShare = fairShare;
+  }
+  
+  /** Get the fair share assigned to this Schedulable. */
+  public double getFairShare() {
+    return fairShare;
+  }
+  
+  /** Convenient toString implementation for debugging. */
+  @Override
+  public String toString() {
+    return String.format("[%s, demand=%d, running=%d, share=%.1f, w=%.1f]",
+        getName(), getDemand(), getRunningTasks(), fairShare, getWeight());
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class containing scheduling algorithms used in the fair scheduler.
+ */
+class SchedulingAlgorithms {
+  public static final Log LOG = LogFactory.getLog(
+      SchedulingAlgorithms.class.getName());
+  
+  /**
+   * Compare Schedulables in order of priority and then submission time, as in
+   * the default FIFO scheduler in Hadoop.
+   */
+  public static class FifoComparator implements Comparator<Schedulable> {
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      int res = s1.getPriority().compareTo(s2.getPriority());
+      if (res == 0) {
+        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+      }
+      if (res == 0) {
+        // In the rare case where jobs were submitted at the exact same time,
+        // compare them by name (which will be the JobID) to get a deterministic
+        // ordering, so we don't alternately launch tasks from different jobs.
+        res = s1.getName().compareTo(s2.getName());
+      }
+      return res;
+    }
+  }
+
+  /**
+   * Compare Schedulables via weighted fair sharing. In addition, Schedulables
+   * below their min share get priority over those whose min share is met. 
+   * 
+   * Schedulables below their min share are compared by how far below it they
+   * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
+   * and job B has 50 out of a min share of 100, then job B is scheduled next, 
+   * because B is at 50% of its min share and A is at 80% of its min share.
+   * 
+   * Schedulables above their min share are compared by (runningTasks / weight).
+   * If all weights are equal, slots are given to the job with the fewest tasks;
+   * otherwise, jobs with more weight get proportionally more slots.
+   */
+  public static class FairShareComparator implements Comparator<Schedulable> {
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      double minShareRatio1, minShareRatio2;
+      double tasksToWeightRatio1, tasksToWeightRatio2;
+      int minShare1 = Math.min(s1.getMinShare(), s1.getDemand());
+      int minShare2 = Math.min(s2.getMinShare(), s2.getDemand());
+      boolean s1Needy = s1.getRunningTasks() < minShare1;
+      boolean s2Needy = s2.getRunningTasks() < minShare2;
+      minShareRatio1 = s1.getRunningTasks() / Math.max(minShare1, 1.0);
+      minShareRatio2 = s2.getRunningTasks() / Math.max(minShare2, 1.0);
+      tasksToWeightRatio1 = s1.getRunningTasks() / s1.getWeight();
+      tasksToWeightRatio2 = s2.getRunningTasks() / s2.getWeight();
+      int res = 0;
+      if (s1Needy && !s2Needy)
+        res = -1;
+      else if (s2Needy && !s1Needy)
+        res = 1;
+      else if (s1Needy && s2Needy)
+        res = (int) Math.signum(minShareRatio1 - minShareRatio2);
+      else // Neither schedulable is needy
+        res = (int) Math.signum(tasksToWeightRatio1 - tasksToWeightRatio2);
+      if (res == 0) {
+        // Jobs are tied in fairness ratio. Break the tie by submit time and job 
+        // name to get a deterministic ordering, which is useful for unit tests.
+        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+        if (res == 0)
+          res = s1.getName().compareTo(s2.getName());
+      }
+      return res;
+    }
+  }
+
+  /** 
+   * Number of iterations for the binary search in computeFairShares. This is 
+   * equivalent to the number of bits of precision in the output. 25 iterations 
+   * gives precision better than 0.1 slots in clusters with one million slots.
+   */
+  private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+  
+  /**
+   * Given a set of Schedulables and a number of slots, compute their weighted
+   * fair shares. The min shares and demands of the Schedulables are assumed to
+   * be set beforehand. We compute the fairest possible allocation of shares 
+   * to the Schedulables that respects their min shares and demands.
+   * 
+   * To understand what this method does, we must first define what weighted
+   * fair sharing means in the presence of minimum shares and demands. If there
+   * were no minimum shares and every Schedulable had an infinite demand (i.e.
+   * could launch infinitely many tasks), then weighted fair sharing would be
+   * achieved if the ratio of slotsAssigned / weight was equal for each
+   * Schedulable and all slots were assigned. Minimum shares and demands add
+   * two further twists:
+   * - Some Schedulables may not have enough tasks to fill all their share.
+   * - Some Schedulables may have a min share higher than their assigned share.
+   * 
+   * To deal with these possibilities, we define an assignment of slots as
+   * being fair if there exists a ratio R such that:
+   * - Schedulables S where S.demand < R * S.weight are assigned share S.demand
+   * - Schedulables S where S.minShare > R * S.weight are given share S.minShare
+   * - All other Schedulables S are assigned share R * S.weight
+   * - The sum of all the shares is totalSlots.
+   * 
+   * We call R the weight-to-slots ratio because it converts a Schedulable's
+   * weight to the number of slots it is assigned.
+   * 
+   * We compute a fair allocation by finding a suitable weight-to-slot ratio R.
+   * To do this, we use binary search. Given a ratio R, we compute the number
+   * of slots that would be used in total with this ratio (the sum of the shares
+   * computed using the conditions above). If this number of slots is less than
+   * totalSlots, then R is too small and more slots could be assigned. If the
+   * number of slots is more than totalSlots, then R is too large. 
+   * 
+   * We begin the binary search with a lower bound on R of 0 (which means that
+   * all Schedulables are only given their minShare) and an upper bound computed
+   * to be large enough that too many slots are given (by doubling R until we
+   * either use more than totalSlots slots or we fulfill all jobs' demands).
+   * The helper method slotsUsedWithWeightToSlotRatio computes the total number
+   * of slots used with a given value of R.
+   * 
+   * The running time of this algorithm is linear in the number of Schedulables,
+   * because slotsUsedWithWeightToSlotRatio is linear-time and the number of
+   * iterations of binary search is a constant (dependent on desired precision).
+   */
+  public static void computeFairShares(
+      Collection<? extends Schedulable> schedulables, double totalSlots) {
+    // Find an upper bound on R that we can use in our binary search. We start 
+    // at R = 1 and double it until we have either used totalSlots slots or we
+    // have met all Schedulables' demands (if total demand < totalSlots).
+    double totalDemand = 0;
+    for (Schedulable sched: schedulables) {
+      totalDemand += sched.getDemand();
+    }
+    double cap = Math.min(totalDemand, totalSlots);
+    double rMax = 1.0;
+    while (slotsUsedWithWeightToSlotRatio(rMax, schedulables) < cap) {
+      rMax *= 2.0;
+    }
+    // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
+    double left = 0;
+    double right = rMax;
+    for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
+      double mid = (left + right) / 2.0;
+      if (slotsUsedWithWeightToSlotRatio(mid, schedulables) < cap) {
+        left = mid;
+      } else {
+        right = mid;
+      }
+    }
+    // Set the fair shares based on the value of R we've converged to
+    for (Schedulable sched: schedulables) {
+      sched.setFairShare(computeShare(sched, right));
+    }
+  }
+  
+  /**
+   * Compute the number of slots that would be used given a weight-to-slot
+   * ratio w2sRatio, for use in the computeFairShares algorithm as described
+   * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+   */
+  private static double slotsUsedWithWeightToSlotRatio(double w2sRatio,
+      Collection<? extends Schedulable> schedulables) {
+    double slotsTaken = 0;
+    for (Schedulable sched: schedulables) {
+      double share = computeShare(sched, w2sRatio);
+      slotsTaken += share;
+    }
+    return slotsTaken;
+  }
+
+  /**
+   * Compute the number of slots assigned to a Schedulable given a particular
+   * weight-to-slot ratio w2sRatio, for use in computeFairShares as described
+   * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+   */
+  private static double computeShare(Schedulable sched, double w2sRatio) {
+    double share = sched.getWeight() * w2sRatio;
+    share = Math.max(share, sched.getMinShare());
+    share = Math.min(share, sched.getDemand());
+    return share;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Internal scheduling modes for pools.
+ */
+public enum SchedulingMode {
+  FAIR, FIFO
+}

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java Fri Aug 14 16:32:04 2009
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A pluggable object for selecting tasks to run from a {@link JobInProgress} on
@@ -86,7 +87,7 @@
    * @throws IOException 
    */
   public abstract Task obtainNewMapTask(TaskTrackerStatus taskTracker,
-      JobInProgress job) throws IOException;
+      JobInProgress job, int localityLevel) throws IOException;
 
   /**
    * Choose a reduce task to run from the given job on the given TaskTracker.

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Dummy implementation of Schedulable for unit testing.
+ */
+public class FakeSchedulable extends Schedulable {
+  private int demand;
+  private int runningTasks;
+  private int minShare;
+  private double weight;
+  private JobPriority priority;
+  private long startTime;
+  
+  public FakeSchedulable() {
+    this(0, 0, 1, 0, 0, JobPriority.NORMAL, 0);
+  }
+  
+  public FakeSchedulable(int demand) {
+    this(demand, 0, 1, 0, 0, JobPriority.NORMAL, 0);
+  }
+  
+  public FakeSchedulable(int demand, int minShare) {
+    this(demand, minShare, 1, 0, 0, JobPriority.NORMAL, 0);
+  }
+  
+  public FakeSchedulable(int demand, int minShare, double weight) {
+    this(demand, minShare, weight, 0, 0, JobPriority.NORMAL, 0);
+  }
+  
+  public FakeSchedulable(int demand, int minShare, double weight, int fairShare,
+      int runningTasks, JobPriority priority, long startTime) {
+    this.demand = demand;
+    this.minShare = minShare;
+    this.weight = weight;
+    setFairShare(fairShare);
+    this.runningTasks = runningTasks;
+    this.priority = priority;
+    this.startTime = startTime;
+  }
+  
+  @Override
+  public Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException {
+    return null;
+  }
+
+  @Override
+  public int getDemand() {
+    return demand;
+  }
+
+  @Override
+  public String getName() {
+    return "FakeSchedulable" + this.hashCode();
+  }
+
+  @Override
+  public JobPriority getPriority() {
+    return priority;
+  }
+
+  @Override
+  public int getRunningTasks() {
+    return runningTasks;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+  
+  @Override
+  public double getWeight() {
+    return weight;
+  }
+  
+  @Override
+  public int getMinShare() {
+    return minShare;
+  }
+
+  @Override
+  public void redistributeShare() {}
+
+  @Override
+  public void updateDemand() {}
+}

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+/**
+ * Exercise the computeFairShares method in SchedulingAlgorithms.
+ */
+public class TestComputeFairShares extends TestCase {
+  private List<Schedulable> scheds;
+  
+  @Override
+  protected void setUp() throws Exception {
+    scheds = new ArrayList<Schedulable>();
+  }
+  
+  /** 
+   * Basic test - pools with different demands that are all higher than their
+   * fair share (of 10 slots) should each get their fair share.
+   */
+  public void testEqualSharing() {
+    scheds.add(new FakeSchedulable(100));
+    scheds.add(new FakeSchedulable(50));
+    scheds.add(new FakeSchedulable(30));
+    scheds.add(new FakeSchedulable(20));
+    SchedulingAlgorithms.computeFairShares(scheds, 40);
+    verifyShares(10, 10, 10, 10);
+  }
+  
+  /**
+   * In this test, pool 4 has a smaller demand than the 40 / 4 = 10 slots that
+   * it would be assigned with equal sharing. It should only get the 3 slots
+   * it demands. The other pools must then split the remaining 37 slots, but
+   * pool 3, with 11 slots demanded, is now below its share of 37/3 ~= 12.3,
+   * so it only gets 11 slots. Pools 1 and 2 split the rest and get 13 each. 
+   */
+  public void testLowDemands() {
+    scheds.add(new FakeSchedulable(100));
+    scheds.add(new FakeSchedulable(50));
+    scheds.add(new FakeSchedulable(11));
+    scheds.add(new FakeSchedulable(3));
+    SchedulingAlgorithms.computeFairShares(scheds, 40);
+    verifyShares(13, 13, 11, 3);
+  }
+  
+  /**
+   * In this test, some pools have minimum shares set. Pool 1 has a min share
+   * of 20 so it gets 20 slots. Pool 2 also has a min share of 20, but its
+   * demand is only 10 so it can only get 10 slots. The remaining pools have
+   * 10 slots to split between them. Pool 4 gets 3 slots because its demand is
+   * only 3, and pool 3 gets the remaining 7 slots. Pool 4 also had a min share
+   * of 2 slots but this should not affect the outcome.
+   */
+  public void testMinShares() {
+    scheds.add(new FakeSchedulable(100, 20));
+    scheds.add(new FakeSchedulable(10, 20));
+    scheds.add(new FakeSchedulable(10, 0));
+    scheds.add(new FakeSchedulable(3, 2));
+    SchedulingAlgorithms.computeFairShares(scheds, 40);
+    verifyShares(20, 10, 7, 3);
+  }
+  
+  /**
+   * Basic test for weighted shares with no minimum shares and no low demands.
+   * Each pool should get slots in proportion to its weight.
+   */
+  public void testWeightedSharing() {
+    scheds.add(new FakeSchedulable(100, 0, 2.0));
+    scheds.add(new FakeSchedulable(50,  0, 1.0));
+    scheds.add(new FakeSchedulable(30,  0, 1.0));
+    scheds.add(new FakeSchedulable(20,  0, 0.5));
+    SchedulingAlgorithms.computeFairShares(scheds, 45);
+    verifyShares(20, 10, 10, 5);
+  }
+
+  /**
+   * Weighted sharing test where pools 1 and 2 are now given lower demands than
+   * above. Pool 1 stops at 10 slots, leaving 35. If the remaining pools split
+   * this into a 1:1:0.5 ratio, they would get 14:14:7 slots respectively, but
+   * pool 2's demand is only 11, so it only gets 11. The remaining 2 pools split
+   * the 24 slots left into a 1:0.5 ratio, getting 16 and 8 slots respectively.
+   */
+  public void testWeightedSharingWithLowDemands() {
+    scheds.add(new FakeSchedulable(10, 0, 2.0));
+    scheds.add(new FakeSchedulable(11, 0, 1.0));
+    scheds.add(new FakeSchedulable(30, 0, 1.0));
+    scheds.add(new FakeSchedulable(20, 0, 0.5));
+    SchedulingAlgorithms.computeFairShares(scheds, 45);
+    verifyShares(10, 11, 16, 8);
+  }
+
+  /**
+   * Weighted fair sharing test with min shares. As in the min share test above,
+   * pool 1 has a min share greater than its demand so it only gets its demand.
+   * Pool 3 has a min share of 15 even though its weight is very small, so it
+   * gets 15 slots. The remaining pools share the remaining 20 slots equally,
+   * getting 10 each. Pool 3's min share of 5 slots doesn't affect this.
+   */
+  public void testWeightedSharingWithMinShares() {
+    scheds.add(new FakeSchedulable(10, 20, 2.0));
+    scheds.add(new FakeSchedulable(11, 0, 1.0));
+    scheds.add(new FakeSchedulable(30, 5, 1.0));
+    scheds.add(new FakeSchedulable(20, 15, 0.5));
+    SchedulingAlgorithms.computeFairShares(scheds, 45);
+    verifyShares(10, 10, 10, 15);
+  }
+
+  /**
+   * Test that shares are computed accurately even when there are many more
+   * frameworks than available slots.
+   */
+  public void testSmallShares() {
+    scheds.add(new FakeSchedulable(10));
+    scheds.add(new FakeSchedulable(5));
+    scheds.add(new FakeSchedulable(3));
+    scheds.add(new FakeSchedulable(2));
+    SchedulingAlgorithms.computeFairShares(scheds, 1);
+    verifyShares(0.25, 0.25, 0.25, 0.25);
+  }
+
+  /**
+   * Test that shares are computed accurately even when the number of slots is
+   * very large.
+   */  
+  public void testLargeShares() {
+    int million = 1000 * 1000;
+    scheds.add(new FakeSchedulable(100 * million));
+    scheds.add(new FakeSchedulable(50 * million));
+    scheds.add(new FakeSchedulable(30 * million));
+    scheds.add(new FakeSchedulable(20 * million));
+    SchedulingAlgorithms.computeFairShares(scheds, 40 * million);
+    verifyShares(10 * million, 10 * million, 10 * million, 10 * million);
+  }
+
+  /**
+   * Test that having a pool with 0 demand doesn't confuse the algorithm.
+   */
+  public void testZeroDemand() {
+    scheds.add(new FakeSchedulable(100));
+    scheds.add(new FakeSchedulable(50));
+    scheds.add(new FakeSchedulable(30));
+    scheds.add(new FakeSchedulable(0));
+    SchedulingAlgorithms.computeFairShares(scheds, 30);
+    verifyShares(10, 10, 10, 0);
+  }
+  
+  /**
+   * Test that being called on an empty list doesn't confuse the algorithm.
+   */
+  public void testEmptyList() {
+    SchedulingAlgorithms.computeFairShares(scheds, 40);
+    verifyShares();
+  }
+  
+  /**
+   * Check that a given list of shares have been assigned to this.scheds.
+   */
+  private void verifyShares(double... shares) {
+    assertEquals(scheds.size(), shares.length);
+    for (int i = 0; i < shares.length; i++) {
+      assertEquals(shares[i], scheds.get(i).getFairShare(), 0.01);
+    }
+  }
+}