You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/09/12 01:57:38 UTC

svn commit: r1169585 [2/5] - in /hadoop/common/branches/branch-0.20-security: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/co...

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Sun Sep 11 23:57:37 2011
@@ -25,28 +25,47 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * A {@link TaskScheduler} that implements fair sharing.
  */
 public class FairScheduler extends TaskScheduler {
-  /** How often fair shares are re-calculated */
-  public static final long UPDATE_INTERVAL = 500;
   public static final Log LOG = LogFactory.getLog(
       "org.apache.hadoop.mapred.FairScheduler");
+
+  // How often fair shares are re-calculated
+  protected long updateInterval = 500;
+
+  // How often to dump scheduler state to the event log
+  protected long dumpInterval = 10000;
+  
+  // How often tasks are preempted (must be longer than a couple
+  // of heartbeats to give task-kill commands a chance to act).
+  protected long preemptionInterval = 15000;
+  
+  // Used to iterate through map and reduce task types
+  private static final TaskType[] MAP_AND_REDUCE = 
+    new TaskType[] {TaskType.MAP, TaskType.REDUCE};
+  
+  // Maximum locality delay when auto-computing locality delays
+  private static final long MAX_AUTOCOMPUTED_LOCALITY_DELAY = 15000;
   
   protected PoolManager poolMgr;
   protected LoadManager loadMgr;
@@ -55,16 +74,29 @@ public class FairScheduler extends TaskS
   protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
     new HashMap<JobInProgress, JobInfo>();
   protected long lastUpdateTime;           // Time when we last updated infos
+  protected long lastPreemptionUpdateTime; // Time when we last updated preemption vars
   protected boolean initialized;  // Are we initialized?
   protected volatile boolean running; // Are we running?
-  protected boolean useFifo;      // Set if we want to revert to FIFO behavior
   protected boolean assignMultiple; // Simultaneously assign map and reduce?
+  protected int mapAssignCap = -1;    // Max maps to launch per heartbeat
+  protected int reduceAssignCap = -1; // Max reduces to launch per heartbeat
+  protected long nodeLocalityDelay;   // Time to wait for node locality
+  protected long rackLocalityDelay;   // Time to wait for rack locality
+  protected boolean autoComputeLocalityDelay = false; // Compute locality delay
+                                                      // from heartbeat interval
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected boolean waitForMapsBeforeLaunchingReduces = true;
+  protected boolean preemptionEnabled;
+  protected boolean onlyLogPreemption; // Only log when tasks should be killed
   private Clock clock;
-  private boolean runBackgroundUpdates; // Can be set to false for testing
-  private EagerTaskInitializationListener eagerInitListener;
   private JobListener jobListener;
+  private JobInitializer jobInitializer;
+  private boolean mockMode; // Used for unit tests; disables background updates
+                            // and scheduler event log
+  private FairSchedulerEventLog eventLog;
+  protected long lastDumpTime;       // Time when we last dumped state to log
+  protected long lastHeartbeatTime;  // Time we last ran assignTasks 
+  private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary
   
   /**
    * A class for holding per-job scheduler variables. These always contain the
@@ -73,30 +105,32 @@ public class FairScheduler extends TaskS
    */
   static class JobInfo {
     boolean runnable = false;   // Can the job run given user/pool limits?
-    double mapWeight = 0;       // Weight of job in calculation of map share
-    double reduceWeight = 0;    // Weight of job in calculation of reduce share
-    long mapDeficit = 0;        // Time deficit for maps
-    long reduceDeficit = 0;     // Time deficit for reduces
-    int runningMaps = 0;        // Maps running at last update
-    int runningReduces = 0;     // Reduces running at last update
-    int neededMaps;             // Maps needed at last update
-    int neededReduces;          // Reduces needed at last update
-    int minMaps = 0;            // Minimum maps as guaranteed by pool
-    int minReduces = 0;         // Minimum reduces as guaranteed by pool
-    double mapFairShare = 0;    // Fair share of map slots at last update
-    double reduceFairShare = 0; // Fair share of reduce slots at last update
+    // Does this job need to be initialized?
+    volatile boolean needsInitializing = true;
+    public JobSchedulable mapSchedulable;
+    public JobSchedulable reduceSchedulable;
+    // Variables used for delay scheduling
+    LocalityLevel lastMapLocalityLevel; // Locality level of last map launched
+    long timeWaitedForLocalMap; // Time waiting for local map since last map
+    boolean skippedAtLastHeartbeat;  // Was job skipped at previous assignTasks?
+                                     // (used to update timeWaitedForLocalMap)
+    public JobInfo(JobSchedulable mapSched, JobSchedulable reduceSched) {
+      this.mapSchedulable = mapSched;
+      this.reduceSchedulable = reduceSched;
+      this.lastMapLocalityLevel = LocalityLevel.NODE;
+    }
   }
   
   public FairScheduler() {
-    this(new Clock(), true);
+    this(new Clock(), false);
   }
   
   /**
    * Constructor used for tests, which can change the clock and disable updates.
    */
-  protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
+  protected FairScheduler(Clock clock, boolean mockMode) {
     this.clock = clock;
-    this.runBackgroundUpdates = runBackgroundUpdates;
+    this.mockMode = mockMode;
     this.jobListener = new JobListener();
   }
 
@@ -104,16 +138,27 @@ public class FairScheduler extends TaskS
   public void start() {
     try {
       Configuration conf = getConf();
-      this.eagerInitListener = new EagerTaskInitializationListener(conf);
-      eagerInitListener.setTaskTrackerManager(taskTrackerManager);
-      eagerInitListener.start();
-      taskTrackerManager.addJobInProgressListener(eagerInitListener);
+      // Create scheduling log and initialize it if it is enabled
+      eventLog = new FairSchedulerEventLog();
+      boolean logEnabled = conf.getBoolean(
+          "mapred.fairscheduler.eventlog.enabled", false);
+      if (!mockMode && logEnabled) {
+        String hostname = "localhost";
+        if (taskTrackerManager instanceof JobTracker) {
+          hostname = ((JobTracker) taskTrackerManager).getJobTrackerMachine();
+        }
+        eventLog.init(conf, hostname);
+      }
+      // Initialize other pieces of the scheduler
+      jobInitializer = new JobInitializer(conf, taskTrackerManager);
       taskTrackerManager.addJobInProgressListener(jobListener);
-      poolMgr = new PoolManager(conf);
+      poolMgr = new PoolManager(this);
+      poolMgr.initialize();
       loadMgr = (LoadManager) ReflectionUtils.newInstance(
           conf.getClass("mapred.fairscheduler.loadmanager", 
               CapBasedLoadManager.class, LoadManager.class), conf);
       loadMgr.setTaskTrackerManager(taskTrackerManager);
+      loadMgr.setEventLog(eventLog);
       loadMgr.start();
       taskSelector = (TaskSelector) ReflectionUtils.newInstance(
           conf.getClass("mapred.fairscheduler.taskselector", 
@@ -126,16 +171,41 @@ public class FairScheduler extends TaskS
         weightAdjuster = (WeightAdjuster) ReflectionUtils.newInstance(
             weightAdjClass, conf);
       }
-      assignMultiple = conf.getBoolean("mapred.fairscheduler.assignmultiple",
-          false);
-      sizeBasedWeight = conf.getBoolean("mapred.fairscheduler.sizebasedweight",
-          false);
+      updateInterval = conf.getLong(
+          "mapred.fairscheduler.update.interval", 500);
+      dumpInterval = conf.getLong(
+          "mapred.fairscheduler.dump.interval", 10000);
+      preemptionInterval = conf.getLong(
+          "mapred.fairscheduler.preemption.interval", 15000);
+      assignMultiple = conf.getBoolean(
+          "mapred.fairscheduler.assignmultiple", true);
+      mapAssignCap = conf.getInt(
+          "mapred.fairscheduler.assignmultiple.maps", -1);
+      reduceAssignCap = conf.getInt(
+          "mapred.fairscheduler.assignmultiple.reduces", -1);
+      sizeBasedWeight = conf.getBoolean(
+          "mapred.fairscheduler.sizebasedweight", false);
+      preemptionEnabled = conf.getBoolean(
+          "mapred.fairscheduler.preemption", false);
+      onlyLogPreemption = conf.getBoolean(
+          "mapred.fairscheduler.preemption.only.log", false);
+      long defaultDelay = conf.getLong(
+          "mapred.fairscheduler.locality.delay", -1);
+      nodeLocalityDelay = conf.getLong(
+          "mapred.fairscheduler.locality.delay.node", defaultDelay);
+      rackLocalityDelay = conf.getLong(
+          "mapred.fairscheduler.locality.delay.rack", defaultDelay);
+      if (defaultDelay == -1 && 
+          (nodeLocalityDelay == -1 || rackLocalityDelay == -1)) {
+        autoComputeLocalityDelay = true; // Compute from heartbeat interval
+      }
       initialized = true;
       running = true;
       lastUpdateTime = clock.getTime();
       // Start a thread to update deficits every UPDATE_INTERVAL
-      if (runBackgroundUpdates)
+      if (!mockMode) {
         new UpdateThread().start();
+      }
       // Register servlet with JobTracker's Jetty server
       if (taskTrackerManager instanceof JobTracker) {
         JobTracker jobTracker = (JobTracker) taskTrackerManager;
@@ -144,6 +214,10 @@ public class FairScheduler extends TaskS
         infoServer.addServlet("scheduler", "/scheduler",
             FairSchedulerServlet.class);
       }
+      
+      initMetrics();
+      
+      eventLog.log("INITIALIZED");
     } catch (Exception e) {
       // Can't load one of the managers - crash the JobTracker now while it is
       // starting up so that the user notices.
@@ -152,25 +226,94 @@ public class FairScheduler extends TaskS
     LOG.info("Successfully configured FairScheduler");
   }
 
+  private MetricsUpdater metricsUpdater; // responsible for pushing hadoop metrics
+
+  /**
+   * Returns the LoadManager object used by the Fair Share scheduler
+   */
+  LoadManager getLoadManager() {
+    return loadMgr;
+  }
+
+  /**
+   * Register metrics for the fair scheduler, and start a thread
+   * to update them periodically.
+   */
+  private void initMetrics() {
+    MetricsContext context = MetricsUtil.getContext("fairscheduler");
+    metricsUpdater = new MetricsUpdater();
+    context.registerUpdater(metricsUpdater);
+  }
+
   @Override
   public void terminate() throws IOException {
+    if (eventLog != null)
+      eventLog.log("SHUTDOWN");
     running = false;
+    jobInitializer.terminate();
     if (jobListener != null)
       taskTrackerManager.removeJobInProgressListener(jobListener);
-    if (eagerInitListener != null)
-      taskTrackerManager.removeJobInProgressListener(eagerInitListener);
+    if (eventLog != null)
+      eventLog.shutdown();
+    if (metricsUpdater != null) {
+      MetricsContext context = MetricsUtil.getContext("fairscheduler");
+      context.unregisterUpdater(metricsUpdater);
+      metricsUpdater = null;
+    }
+  }
+ 
+
+  private class JobInitializer {
+    private final int DEFAULT_NUM_THREADS = 1;
+    private ExecutorService threadPool;
+    private TaskTrackerManager ttm;
+    public JobInitializer(Configuration conf, TaskTrackerManager ttm) {
+      int numThreads = conf.getInt("mapred.jobinit.threads",
+          DEFAULT_NUM_THREADS);
+      threadPool = Executors.newFixedThreadPool(numThreads);
+      this.ttm = ttm;
+    }
+    public void initJob(JobInfo jobInfo, JobInProgress job) {
+      if (!mockMode) {
+        threadPool.execute(new InitJob(jobInfo, job));
+      } else {
+        new InitJob(jobInfo, job).run();
+      }
+    }
+    class InitJob implements Runnable {
+      private JobInfo jobInfo;
+      private JobInProgress job;
+      public InitJob(JobInfo jobInfo, JobInProgress job) {
+        this.jobInfo = jobInfo;
+        this.job = job;
+      }
+      public void run() {
+        ttm.initJob(job);
+      }
+    }
+    void terminate() {
+      LOG.info("Shutting down thread pool");
+      threadPool.shutdownNow();
+      try {
+        threadPool.awaitTermination(1, TimeUnit.MINUTES);
+      } catch (InterruptedException e) {
+        // Ignore, we are in shutdown anyway.
+      }
+    }
   }
-  
-  /**
+
+/**
    * Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
    */
   private class JobListener extends JobInProgressListener {
     @Override
     public void jobAdded(JobInProgress job) {
       synchronized (FairScheduler.this) {
-        poolMgr.addJob(job);
-        JobInfo info = new JobInfo();
+        eventLog.log("JOB_ADDED", job.getJobID());
+        JobInfo info = new JobInfo(new JobSchedulable(FairScheduler.this, job, TaskType.MAP),
+            new JobSchedulable(FairScheduler.this, job, TaskType.REDUCE));
         infos.put(job, info);
+        poolMgr.addJob(job); // Also adds job into the right PoolScheduable
         update();
       }
     }
@@ -178,13 +321,14 @@ public class FairScheduler extends TaskS
     @Override
     public void jobRemoved(JobInProgress job) {
       synchronized (FairScheduler.this) {
-        poolMgr.removeJob(job);
-        infos.remove(job);
+        eventLog.log("JOB_REMOVED", job.getJobID());
+        jobNoLongerRunning(job);
       }
     }
   
     @Override
     public void jobUpdated(JobChangeEvent event) {
+      eventLog.log("JOB_UPDATED", event.getJobInProgress().getJobID());
     }
   }
 
@@ -200,30 +344,50 @@ public class FairScheduler extends TaskS
     public void run() {
       while (running) {
         try {
-          Thread.sleep(UPDATE_INTERVAL);
+          Thread.sleep(updateInterval);
           update();
+          dumpIfNecessary();
+          preemptTasksIfNecessary();
         } catch (Exception e) {
-          LOG.error("Failed to update fair share calculations", e);
+          LOG.error("Exception in fair scheduler UpdateThread", e);
         }
       }
     }
   }
+
+  /**
+   * Responsible for updating metrics when the metrics context requests it.
+   */
+  private class MetricsUpdater implements Updater {
+    @Override
+    public void doUpdates(MetricsContext context) {
+      updateMetrics();
+    }    
+  }
+  
+  synchronized void updateMetrics() {
+    poolMgr.updateMetrics();
+  }
   
   @Override
   public synchronized List<Task> assignTasks(TaskTracker tracker)
       throws IOException {
     if (!initialized) // Don't try to assign tasks if we haven't yet started up
       return null;
+    String trackerName = tracker.getTrackerName();
+    eventLog.log("HEARTBEAT", trackerName);
+    long currentTime = clock.getTime();
     
-    // Reload allocations file if it hasn't been loaded in a while
-    poolMgr.reloadAllocsIfNecessary();
-    
-    // Compute total runnable maps and reduces
+    // Compute total runnable maps and reduces, and currently running ones
     int runnableMaps = 0;
+    int runningMaps = 0;
     int runnableReduces = 0;
-    for (JobInProgress job: infos.keySet()) {
-      runnableMaps += runnableTasks(job, TaskType.MAP);
-      runnableReduces += runnableTasks(job, TaskType.REDUCE);
+    int runningReduces = 0;
+    for (Pool pool: poolMgr.getPools()) {
+      runnableMaps += pool.getMapSchedulable().getDemand();
+      runningMaps += pool.getMapSchedulable().getRunningTasks();
+      runnableReduces += pool.getReduceSchedulable().getDemand();
+      runningReduces += pool.getReduceSchedulable().getRunningTasks();
     }
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
@@ -233,50 +397,123 @@ public class FairScheduler extends TaskS
     int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
     int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
     
-    // Scan to see whether any job needs to run a map, then a reduce
+    eventLog.log("RUNNABLE_TASKS", 
+        runnableMaps, runningMaps, runnableReduces, runningReduces);
+
+    // Update time waited for local maps for jobs skipped on last heartbeat
+    updateLocalityWaitTimes(currentTime);
+    
+    TaskTrackerStatus tts = tracker.getStatus();
+
+    int mapsAssigned = 0; // loop counter for map in the below while loop
+    int reducesAssigned = 0; // loop counter for reduce in the below while
+    int mapCapacity = maxTasksToAssign(TaskType.MAP, tts);
+    int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts);
+    boolean mapRejected = false; // flag used for ending the loop
+    boolean reduceRejected = false; // flag used for ending the loop
+
+    // Keep track of which jobs were visited for map tasks and which had tasks
+    // launched, so that we can later mark skipped jobs for delay scheduling
+    Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>();
+    Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>();
+    Set<JobInProgress> launchedMap = new HashSet<JobInProgress>();
+
     ArrayList<Task> tasks = new ArrayList<Task>();
-    TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
-    TaskTrackerStatus trackerStatus = tracker.getStatus();
-    for (TaskType taskType: types) {
-      boolean canAssign = (taskType == TaskType.MAP) ? 
-          loadMgr.canAssignMap(trackerStatus, runnableMaps, totalMapSlots) :
-          loadMgr.canAssignReduce(trackerStatus, runnableReduces, totalReduceSlots);
-      if (canAssign) {
-        // Figure out the jobs that need this type of task
-        List<JobInProgress> candidates = new ArrayList<JobInProgress>();
-        for (JobInProgress job: infos.keySet()) {
-          if (job.getStatus().getRunState() == JobStatus.RUNNING && 
-              neededTasks(job, taskType) > 0) {
-            candidates.add(job);
-          }
+    // Scan jobs to assign tasks until neither maps nor reduces can be assigned
+    while (true) {
+      // Computing the ending conditions for the loop
+      // Reject a task type if one of the following condition happens
+      // 1. number of assigned task reaches per heatbeat limit
+      // 2. number of running tasks reaches runnable tasks
+      // 3. task is rejected by the LoadManager.canAssign
+      if (!mapRejected) {
+        if (mapsAssigned == mapCapacity ||
+            runningMaps == runnableMaps ||
+            !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots)) {
+          eventLog.log("INFO", "Can't assign another MAP to " + trackerName);
+          mapRejected = true;
         }
-        // Sort jobs by deficit (for Fair Sharing) or submit time (for FIFO)
-        Comparator<JobInProgress> comparator = useFifo ?
-            new FifoJobComparator() : new DeficitComparator(taskType);
-        Collections.sort(candidates, comparator);
-        for (JobInProgress job: candidates) {
-          Task task = (taskType == TaskType.MAP ? 
-              taskSelector.obtainNewMapTask(trackerStatus, job) :
-              taskSelector.obtainNewReduceTask(trackerStatus, job));
-          if (task != null) {
-            // Update the JobInfo for this job so we account for the launched
-            // tasks during this update interval and don't try to launch more
-            // tasks than the job needed on future heartbeats
-            JobInfo info = infos.get(job);
-            if (taskType == TaskType.MAP) {
-              info.runningMaps++;
-              info.neededMaps--;
-            } else {
-              info.runningReduces++;
-              info.neededReduces--;
-            }
-            tasks.add(task);
-            if (!assignMultiple)
-              return tasks;
-            break;
+      }
+      if (!reduceRejected) {
+        if (reducesAssigned == reduceCapacity ||
+            runningReduces == runnableReduces ||
+            !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots)) {
+          eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName);
+          reduceRejected = true;
+        }
+      }
+      // Exit while (true) loop if
+      // 1. neither maps nor reduces can be assigned
+      // 2. assignMultiple is off and we already assigned one task
+      if (mapRejected && reduceRejected ||
+          !assignMultiple && tasks.size() > 0) {
+        break; // This is the only exit of the while (true) loop
+      }
+
+      // Determine which task type to assign this time
+      // First try choosing a task type which is not rejected
+      TaskType taskType;
+      if (mapRejected) {
+        taskType = TaskType.REDUCE;
+      } else if (reduceRejected) {
+        taskType = TaskType.MAP;
+      } else {
+        // If both types are available, choose the task type with fewer running
+        // tasks on the task tracker to prevent that task type from starving
+        if (tts.countMapTasks() <= tts.countReduceTasks()) {
+          taskType = TaskType.MAP;
+        } else {
+          taskType = TaskType.REDUCE;
+        }
+      }
+
+      // Get the map or reduce schedulables and sort them by fair sharing
+      List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
+      Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
+      boolean foundTask = false;
+      for (Schedulable sched: scheds) { // This loop will assign only one task
+        eventLog.log("INFO", "Checking for " + taskType +
+            " task in " + sched.getName());
+        Task task = taskType == TaskType.MAP ? 
+                    sched.assignTask(tts, currentTime, visitedForMap) : 
+                    sched.assignTask(tts, currentTime, visitedForReduce);
+        if (task != null) {
+          foundTask = true;
+          JobInProgress job = taskTrackerManager.getJob(task.getJobID());
+          eventLog.log("ASSIGN", trackerName, taskType,
+              job.getJobID(), task.getTaskID());
+          // Update running task counts, and the job's locality level
+          if (taskType == TaskType.MAP) {
+            launchedMap.add(job);
+            mapsAssigned++;
+            runningMaps++;
+            updateLastMapLocalityLevel(job, task, tts);
+          } else {
+            reducesAssigned++;
+            runningReduces++;
           }
+          // Add task to the list of assignments
+          tasks.add(task);
+          break; // This break makes this loop assign only one task
+        } // end if(task != null)
+      } // end for(Schedulable sched: scheds)
+
+      // Reject the task type if we cannot find a task
+      if (!foundTask) {
+        if (taskType == TaskType.MAP) {
+          mapRejected = true;
+        } else {
+          reduceRejected = true;
         }
       }
+    } // end while (true)
+
+    // Mark any jobs that were visited for map tasks but did not launch a task
+    // as skipped on this heartbeat
+    for (JobInProgress job: visitedForMap) {
+      if (!launchedMap.contains(job)) {
+        infos.get(job).skippedAtLastHeartbeat = true;
+      }
     }
     
     // If no tasks were found, return null
@@ -284,40 +521,102 @@ public class FairScheduler extends TaskS
   }
 
   /**
-   * Compare jobs by deficit for a given task type, putting jobs whose current
-   * allocation is less than their minimum share always ahead of others. This is
-   * the default job comparator used for Fair Sharing.
-   */
-  private class DeficitComparator implements Comparator<JobInProgress> {
-    private final TaskType taskType;
-
-    private DeficitComparator(TaskType taskType) {
-      this.taskType = taskType;
-    }
-
-    public int compare(JobInProgress j1, JobInProgress j2) {
-      // Put needy jobs ahead of non-needy jobs (where needy means must receive
-      // new tasks to meet slot minimum), comparing among jobs of the same type
-      // by deficit so as to put jobs with higher deficit ahead.
-      JobInfo j1Info = infos.get(j1);
-      JobInfo j2Info = infos.get(j2);
-      long deficitDif;
-      boolean j1Needy, j2Needy;
-      if (taskType == TaskType.MAP) {
-        j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
-        j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
-        deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
-      } else {
-        j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
-        j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);
-        deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
-      }
-      if (j1Needy && !j2Needy)
-        return -1;
-      else if (j2Needy && !j1Needy)
-        return 1;
-      else // Both needy or both non-needy; compare by deficit
-        return (int) Math.signum(deficitDif);
+   * Get maximum number of tasks to assign on a TaskTracker on a heartbeat.
+   * The scheduler may launch fewer than this many tasks if the LoadManager
+   * says not to launch more, but it will never launch more than this number.
+   */
+  private int maxTasksToAssign(TaskType type, TaskTrackerStatus tts) {
+    if (!assignMultiple)
+      return 1;
+    int cap = (type == TaskType.MAP) ? mapAssignCap : reduceAssignCap;
+    if (cap == -1) // Infinite cap; use the TaskTracker's slot count
+      return (type == TaskType.MAP) ?
+          tts.getAvailableMapSlots(): tts.getAvailableReduceSlots();
+    else
+      return cap;
+  }
+
+  /**
+   * Update locality wait times for jobs that were skipped at last heartbeat.
+   */
+  private void updateLocalityWaitTimes(long currentTime) {
+    long timeSinceLastHeartbeat = 
+      (lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
+    lastHeartbeatTime = currentTime;
+    for (JobInfo info: infos.values()) {
+      if (info.skippedAtLastHeartbeat) {
+        info.timeWaitedForLocalMap += timeSinceLastHeartbeat;
+        info.skippedAtLastHeartbeat = false;
+      }
+    }
+  }
+
+  /**
+   * Update a job's locality level and locality wait variables given that that 
+   * it has just launched a map task on a given task tracker.
+   */
+  private void updateLastMapLocalityLevel(JobInProgress job,
+      Task mapTaskLaunched, TaskTrackerStatus tracker) {
+    JobInfo info = infos.get(job);
+    LocalityLevel localityLevel = LocalityLevel.fromTask(
+        job, mapTaskLaunched, tracker);
+    info.lastMapLocalityLevel = localityLevel;
+    info.timeWaitedForLocalMap = 0;
+    eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel);
+  }
+
+  /**
+   * Get the maximum locality level at which a given job is allowed to
+   * launch tasks, based on how long it has been waiting for local tasks.
+   * This is used to implement the "delay scheduling" feature of the Fair
+   * Scheduler for optimizing data locality.
+   * If the job has no locality information (e.g. it does not use HDFS), this 
+   * method returns LocalityLevel.ANY, allowing tasks at any level.
+   * Otherwise, the job can only launch tasks at its current locality level
+   * or lower, unless it has waited at least nodeLocalityDelay or
+   * rackLocalityDelay milliseconds depends on the current level. If it
+   * has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds,
+   * it can go to any level.
+   */
+  protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
+      long currentTime) {
+    JobInfo info = infos.get(job);
+    if (info == null) { // Job not in infos (shouldn't happen)
+      LOG.error("getAllowedLocalityLevel called on job " + job
+          + ", which does not have a JobInfo in infos");
+      return LocalityLevel.ANY;
+    }
+    if (job.nonLocalMaps.size() > 0) { // Job doesn't have locality information
+      return LocalityLevel.ANY;
+    }
+    // Don't wait for locality if the job's pool is starving for maps
+    Pool pool = poolMgr.getPool(job);
+    PoolSchedulable sched = pool.getMapSchedulable();
+    long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName());
+    long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
+    if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout ||
+        currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+      eventLog.log("INFO", "No delay scheduling for "
+          + job.getJobID() + " because it is being starved");
+      return LocalityLevel.ANY;
+    }
+    // In the common case, compute locality level based on time waited
+    switch(info.lastMapLocalityLevel) {
+    case NODE: // Last task launched was node-local
+      if (info.timeWaitedForLocalMap >=
+          nodeLocalityDelay + rackLocalityDelay)
+        return LocalityLevel.ANY;
+      else if (info.timeWaitedForLocalMap >= nodeLocalityDelay)
+        return LocalityLevel.RACK;
+      else
+        return LocalityLevel.NODE;
+    case RACK: // Last task launched was rack-local
+      if (info.timeWaitedForLocalMap >= rackLocalityDelay)
+        return LocalityLevel.ANY;
+      else
+        return LocalityLevel.RACK;
+    default: // Last task was non-local; can launch anywhere
+      return LocalityLevel.ANY;
     }
   }
   
@@ -327,11 +626,25 @@ public class FairScheduler extends TaskS
    * and needed tasks of each type. 
    */
   protected void update() {
-    //Making more granual locking so that clusterStatus can be fetched from Jobtracker.
+    // Making more granular locking so that clusterStatus can be fetched 
+    // from Jobtracker without locking the scheduler.
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
-    // Got clusterStatus hence acquiring scheduler lock now
-    // Remove non-running jobs
-    synchronized(this){
+    
+    // Recompute locality delay from JobTracker heartbeat interval if enabled.
+    // This will also lock the JT, so do it outside of a fair scheduler lock.
+    if (autoComputeLocalityDelay) {
+      JobTracker jobTracker = (JobTracker) taskTrackerManager;
+      nodeLocalityDelay = Math.min(MAX_AUTOCOMPUTED_LOCALITY_DELAY,
+          (long) (1.5 * jobTracker.getNextHeartbeatInterval()));
+      rackLocalityDelay = nodeLocalityDelay;
+    }
+    
+    // Got clusterStatus hence acquiring scheduler lock now.
+    synchronized (this) {
+      // Reload allocations file if it hasn't been loaded in a while
+      poolMgr.reloadAllocsIfNecessary();
+      
+      // Remove any jobs that have stopped running
       List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
       for (JobInProgress job: infos.keySet()) { 
         int runState = job.getStatus().getRunState();
@@ -341,30 +654,52 @@ public class FairScheduler extends TaskS
         }
       }
       for (JobInProgress job: toRemove) {
-        infos.remove(job);
-        poolMgr.removeJob(job);
+        jobNoLongerRunning(job);
+      }
+      
+      updateRunnability(); // Set job runnability based on user/pool limits 
+      
+      // Update demands of jobs and pools
+      for (Pool pool: poolMgr.getPools()) {
+        pool.getMapSchedulable().updateDemand();
+        pool.getReduceSchedulable().updateDemand();
       }
-      // Update running jobs with deficits since last update, and compute new
-      // slot allocations, weight, shares and task counts
-      long now = clock.getTime();
-      long timeDelta = now - lastUpdateTime;
-      updateDeficits(timeDelta);
-      updateRunnability();
-      updateTaskCounts();
-      updateWeights();
-      updateMinSlots();
-      updateFairShares(clusterStatus);
-      lastUpdateTime = now;
+      
+      // Compute fair shares based on updated demands
+      List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
+      List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
+      SchedulingAlgorithms.computeFairShares(
+          mapScheds, clusterStatus.getMaxMapTasks());
+      SchedulingAlgorithms.computeFairShares(
+          reduceScheds, clusterStatus.getMaxReduceTasks());
+      
+      // Use the computed shares to assign shares within each pool
+      for (Pool pool: poolMgr.getPools()) {
+        pool.getMapSchedulable().redistributeShare();
+        pool.getReduceSchedulable().redistributeShare();
+      }
+      
+      if (preemptionEnabled)
+        updatePreemptionVariables();
     }
   }
+
+  private void jobNoLongerRunning(JobInProgress job) {
+    assert Thread.holdsLock(this);
+    JobInfo info = infos.remove(job);
+    if (info != null) {
+      info.mapSchedulable.cleanupMetrics();
+      info.reduceSchedulable.cleanupMetrics();
+    }
+    poolMgr.removeJob(job);
+  }
   
-  private void updateDeficits(long timeDelta) {
-    for (JobInfo info: infos.values()) {
-      info.mapDeficit +=
-        (info.mapFairShare - info.runningMaps) * timeDelta;
-      info.reduceDeficit +=
-        (info.reduceFairShare - info.runningReduces) * timeDelta;
+  public List<PoolSchedulable> getPoolSchedulables(TaskType type) {
+    List<PoolSchedulable> scheds = new ArrayList<PoolSchedulable>();
+    for (Pool pool: poolMgr.getPools()) {
+      scheds.add(pool.getSchedulable(type));
     }
+    return scheds;
   }
   
   private void updateRunnability() {
@@ -380,301 +715,45 @@ public class FairScheduler extends TaskS
     Map<String, Integer> userJobs = new HashMap<String, Integer>();
     Map<String, Integer> poolJobs = new HashMap<String, Integer>();
     for (JobInProgress job: jobs) {
-      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-        String user = job.getJobConf().getUser();
-        String pool = poolMgr.getPoolName(job);
-        int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
-        int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
-        if (userCount < poolMgr.getUserMaxJobs(user) && 
-            poolCount < poolMgr.getPoolMaxJobs(pool)) {
-          infos.get(job).runnable = true;
+      String user = job.getJobConf().getUser();
+      String pool = poolMgr.getPoolName(job);
+      int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
+      int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
+      if (userCount < poolMgr.getUserMaxJobs(user) &&
+          poolCount < poolMgr.getPoolMaxJobs(pool)) {
+        if (job.getStatus().getRunState() == JobStatus.RUNNING ||
+            job.getStatus().getRunState() == JobStatus.PREP) {
           userJobs.put(user, userCount + 1);
           poolJobs.put(pool, poolCount + 1);
-        }
-      }
-    }
-  }
-
-  private void updateTaskCounts() {
-    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-      JobInProgress job = entry.getKey();
-      JobInfo info = entry.getValue();
-      if (job.getStatus().getRunState() != JobStatus.RUNNING)
-        continue; // Job is still in PREP state and tasks aren't initialized
-      // Count maps
-      int totalMaps = job.numMapTasks;
-      int finishedMaps = 0;
-      int runningMaps = 0;
-      for (TaskInProgress tip : 
-           job.getTasks(org.apache.hadoop.mapreduce.TaskType.MAP)) {
-        if (tip.isComplete()) {
-          finishedMaps += 1;
-        } else if (tip.isRunning()) {
-          runningMaps += tip.getActiveTasks().size();
-        }
-      }
-      info.runningMaps = runningMaps;
-      info.neededMaps = (totalMaps - runningMaps - finishedMaps
-          + taskSelector.neededSpeculativeMaps(job));
-      // Count reduces
-      int totalReduces = job.numReduceTasks;
-      int finishedReduces = 0;
-      int runningReduces = 0;
-      for (TaskInProgress tip : 
-           job.getTasks(org.apache.hadoop.mapreduce.TaskType.REDUCE)) {
-        if (tip.isComplete()) {
-          finishedReduces += 1;
-        } else if (tip.isRunning()) {
-          runningReduces += tip.getActiveTasks().size();
-        }
-      }
-      info.runningReduces = runningReduces;
-      if (enoughMapsFinishedToRunReduces(finishedMaps, totalMaps)) {
-        info.neededReduces = (totalReduces - runningReduces - finishedReduces 
-            + taskSelector.neededSpeculativeReduces(job));
-      } else {
-        info.neededReduces = 0;
-      }
-      // If the job was marked as not runnable due to its user or pool having
-      // too many active jobs, set the neededMaps/neededReduces to 0. We still
-      // count runningMaps/runningReduces however so we can give it a deficit.
-      if (!info.runnable) {
-        info.neededMaps = 0;
-        info.neededReduces = 0;
-      }
-    }
-  }
-
-  /**
-   * Has a job finished enough maps to allow launching its reduces?
-   */
-  protected boolean enoughMapsFinishedToRunReduces(
-      int finishedMaps, int totalMaps) {
-    if (waitForMapsBeforeLaunchingReduces) {
-      return finishedMaps >= Math.max(1, totalMaps * 0.05);
-    } else {
-      return true;
-    }
-  }
-
-  private void updateWeights() {
-    // First, calculate raw weights for each job
-    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-      JobInProgress job = entry.getKey();
-      JobInfo info = entry.getValue();
-      info.mapWeight = calculateRawWeight(job, TaskType.MAP);
-      info.reduceWeight = calculateRawWeight(job, TaskType.REDUCE);
-    }
-    // Now calculate job weight sums for each pool
-    Map<String, Double> mapWeightSums = new HashMap<String, Double>();
-    Map<String, Double> reduceWeightSums = new HashMap<String, Double>();
-    for (Pool pool: poolMgr.getPools()) {
-      double mapWeightSum = 0;
-      double reduceWeightSum = 0;
-      for (JobInProgress job: pool.getJobs()) {
-        if (isRunnable(job)) {
-          if (runnableTasks(job, TaskType.MAP) > 0) {
-            mapWeightSum += infos.get(job).mapWeight;
-          }
-          if (runnableTasks(job, TaskType.REDUCE) > 0) {
-            reduceWeightSum += infos.get(job).reduceWeight;
-          }
-        }
-      }
-      mapWeightSums.put(pool.getName(), mapWeightSum);
-      reduceWeightSums.put(pool.getName(), reduceWeightSum);
-    }
-    // And normalize the weights based on pool sums and pool weights
-    // to share fairly across pools (proportional to their weights)
-    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-      JobInProgress job = entry.getKey();
-      JobInfo info = entry.getValue();
-      String pool = poolMgr.getPoolName(job);
-      double poolWeight = poolMgr.getPoolWeight(pool);
-      double mapWeightSum = mapWeightSums.get(pool);
-      double reduceWeightSum = reduceWeightSums.get(pool);
-      if (mapWeightSum == 0)
-        info.mapWeight = 0;
-      else
-        info.mapWeight *= (poolWeight / mapWeightSum); 
-      if (reduceWeightSum == 0)
-        info.reduceWeight = 0;
-      else
-        info.reduceWeight *= (poolWeight / reduceWeightSum); 
-    }
-  }
-  
-  private void updateMinSlots() {
-    // Clear old minSlots
-    for (JobInfo info: infos.values()) {
-      info.minMaps = 0;
-      info.minReduces = 0;
-    }
-    // For each pool, distribute its task allocation among jobs in it that need
-    // slots. This is a little tricky since some jobs in the pool might not be
-    // able to use all the slots, e.g. they might have only a few tasks left.
-    // To deal with this, we repeatedly split up the available task slots
-    // between the jobs left, give each job min(its alloc, # of slots it needs),
-    // and redistribute any slots that are left over between jobs that still
-    // need slots on the next pass. If, in total, the jobs in our pool don't
-    // need all its allocation, we leave the leftover slots for general use.
-    PoolManager poolMgr = getPoolManager();
-    for (Pool pool: poolMgr.getPools()) {
-      for (final TaskType type: TaskType.values()) {
-        Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
-        int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
-        // Keep assigning slots until none are left
-        while (slotsLeft > 0) {
-          // Figure out total weight of jobs that still need slots
-          double totalWeight = 0;
-          for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
-            JobInProgress job = it.next();
-            if (isRunnable(job) &&
-                runnableTasks(job, type) > minTasks(job, type)) {
-              totalWeight += weight(job, type);
-            } else {
-              it.remove();
+          JobInfo jobInfo = infos.get(job);
+          if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+            jobInfo.runnable = true;
+          } else {
+            // The job is in the PREP state. Give it to the job initializer
+            // for initialization if we have not already done it.
+            if (jobInfo.needsInitializing) {
+              jobInfo.needsInitializing = false;
+              jobInitializer.initJob(jobInfo, job);
             }
           }
-          if (totalWeight == 0) // No jobs that can use more slots are left 
-            break;
-          // Assign slots to jobs, using the floor of their weight divided by
-          // total weight. This ensures that all jobs get some chance to take
-          // a slot. Then, if no slots were assigned this way, we do another
-          // pass where we use ceil, in case some slots were still left over.
-          int oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
-          for (JobInProgress job: jobs) {
-            double weight = weight(job, type);
-            int share = (int) Math.floor(oldSlots * weight / totalWeight);
-            slotsLeft = giveMinSlots(job, type, slotsLeft, share);
-          }
-          if (slotsLeft == oldSlots) {
-            // No tasks were assigned; do another pass using ceil, giving the
-            // extra slots to jobs in order of weight then deficit
-            List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);
-            Collections.sort(sortedJobs, new Comparator<JobInProgress>() {
-              public int compare(JobInProgress j1, JobInProgress j2) {
-                double dif = weight(j2, type) - weight(j1, type);
-                if (dif == 0) // Weights are equal, compare by deficit 
-                  dif = deficit(j2, type) - deficit(j1, type);
-                return (int) Math.signum(dif);
-              }
-            });
-            for (JobInProgress job: sortedJobs) {
-              double weight = weight(job, type);
-              int share = (int) Math.ceil(oldSlots * weight / totalWeight);
-              slotsLeft = giveMinSlots(job, type, slotsLeft, share);
-            }
-            if (slotsLeft > 0) {
-              LOG.warn("Had slotsLeft = " + slotsLeft + " after the final "
-                  + "loop in updateMinSlots. This probably means some fair "
-                  + "scheduler weights are being set to NaN or Infinity.");
-            }
-            break;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Give up to <code>tasksToGive</code> min slots to a job (potentially fewer
-   * if either the job needs fewer slots or there aren't enough slots left).
-   * Returns the number of slots left over.
-   */
-  private int giveMinSlots(JobInProgress job, TaskType type,
-      int slotsLeft, int slotsToGive) {
-    int runnable = runnableTasks(job, type);
-    int curMin = minTasks(job, type);
-    slotsToGive = Math.min(Math.min(slotsLeft, runnable - curMin), slotsToGive);
-    slotsLeft -= slotsToGive;
-    JobInfo info = infos.get(job);
-    if (type == TaskType.MAP)
-      info.minMaps += slotsToGive;
-    else
-      info.minReduces += slotsToGive;
-    return slotsLeft;
-  }
-
-  private void updateFairShares(ClusterStatus clusterStatus) {
-    // Clear old fairShares
-    for (JobInfo info: infos.values()) {
-      info.mapFairShare = 0;
-      info.reduceFairShare = 0;
-    }
-    // Assign new shares, based on weight and minimum share. This is done
-    // as follows. First, we split up the available slots between all
-    // jobs according to weight. Then if there are any jobs whose minSlots is
-    // larger than their fair allocation, we give them their minSlots and
-    // remove them from the list, and start again with the amount of slots
-    // left over. This continues until all jobs' minSlots are less than their
-    // fair allocation, and at this point we know that we've met everyone's
-    // guarantee and we've split the excess capacity fairly among jobs left.
-    for (TaskType type: TaskType.values()) {
-      // Select only jobs that still need this type of task
-      HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
-      for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-        JobInProgress job = entry.getKey();
-        JobInfo info = entry.getValue();
-        if (isRunnable(job) && runnableTasks(job, type) > 0) {
-          jobsLeft.add(info);
-        }
-      }
-      double slotsLeft = getTotalSlots(type, clusterStatus);
-      while (!jobsLeft.isEmpty()) {
-        double totalWeight = 0;
-        for (JobInfo info: jobsLeft) {
-          double weight = (type == TaskType.MAP ?
-              info.mapWeight : info.reduceWeight);
-          totalWeight += weight;
-        }
-        boolean recomputeSlots = false;
-        double oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
-        for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {
-          JobInfo info = iter.next();
-          double minSlots = (type == TaskType.MAP ?
-              info.minMaps : info.minReduces);
-          double weight = (type == TaskType.MAP ?
-              info.mapWeight : info.reduceWeight);
-          double fairShare = weight / totalWeight * oldSlots;
-          if (minSlots > fairShare) {
-            // Job needs more slots than its fair share; give it its minSlots,
-            // remove it from the list, and set recomputeSlots = true to 
-            // remember that we must loop again to redistribute unassigned slots
-            if (type == TaskType.MAP)
-              info.mapFairShare = minSlots;
-            else
-              info.reduceFairShare = minSlots;
-            slotsLeft -= minSlots;
-            iter.remove();
-            recomputeSlots = true;
-          }
-        }
-        if (!recomputeSlots) {
-          // All minimums are met. Give each job its fair share of excess slots.
-          for (JobInfo info: jobsLeft) {
-            double weight = (type == TaskType.MAP ?
-                info.mapWeight : info.reduceWeight);
-            double fairShare = weight / totalWeight * oldSlots;
-            if (type == TaskType.MAP)
-              info.mapFairShare = fairShare;
-            else
-              info.reduceFairShare = fairShare;
-          }
-          break;
         }
       }
     }
   }
 
-  private double calculateRawWeight(JobInProgress job, TaskType taskType) {
+  public double getJobWeight(JobInProgress job, TaskType taskType) {
     if (!isRunnable(job)) {
-      return 0;
+      // Job won't launch tasks, but don't return 0 to avoid division errors
+      return 1.0;
     } else {
       double weight = 1.0;
       if (sizeBasedWeight) {
         // Set weight based on runnable tasks
-        weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);
+        JobInfo info = infos.get(job);
+        int runnableTasks = (taskType == TaskType.MAP) ?
+            info.mapSchedulable.getDemand() : 
+            info.reduceSchedulable.getDemand();
+        weight = Math.log1p(runnableTasks) / Math.log(2);
       }
       weight *= getPriorityFactor(job.getPriority());
       if (weightAdjuster != null) {
@@ -704,49 +783,220 @@ public class FairScheduler extends TaskS
       clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks());
   }
 
-  public synchronized boolean getUseFifo() {
-    return useFifo;
-  }
-  
-  public synchronized void setUseFifo(boolean useFifo) {
-    this.useFifo = useFifo;
+  /**
+   * Update the preemption fields for all PoolScheduables, i.e. the times since
+   * each pool last was at its guaranteed share and at > 1/2 of its fair share
+   * for each type of task.
+   */
+  private void updatePreemptionVariables() {
+    long now = clock.getTime();
+    lastPreemptionUpdateTime = now;
+    for (TaskType type: MAP_AND_REDUCE) {
+      for (PoolSchedulable sched: getPoolSchedulables(type)) {
+        if (!isStarvedForMinShare(sched)) {
+          sched.setLastTimeAtMinShare(now);
+        }
+        if (!isStarvedForFairShare(sched)) {
+          sched.setLastTimeAtHalfFairShare(now);
+        }
+        eventLog.log("PREEMPT_VARS", sched.getName(), type,
+            now - sched.getLastTimeAtMinShare(),
+            now - sched.getLastTimeAtHalfFairShare());
+      }
+    }
   }
-  
-  // Getter methods for reading JobInfo values based on TaskType, safely
-  // returning 0's for jobs with no JobInfo present.
 
-  protected int neededTasks(JobInProgress job, TaskType taskType) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return taskType == TaskType.MAP ? info.neededMaps : info.neededReduces;
+  /**
+   * Is a pool below its min share for the given task type?
+   */
+  boolean isStarvedForMinShare(PoolSchedulable sched) {
+    int desiredShare = Math.min(sched.getMinShare(), sched.getDemand());
+    return (sched.getRunningTasks() < desiredShare);
   }
   
-  protected int runningTasks(JobInProgress job, TaskType taskType) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return taskType == TaskType.MAP ? info.runningMaps : info.runningReduces;
-  }
-
-  protected int runnableTasks(JobInProgress job, TaskType type) {
-    return neededTasks(job, type) + runningTasks(job, type);
+  /**
+   * Is a pool being starved for fair share for the given task type?
+   * This is defined as being below half its fair share.
+   */
+  boolean isStarvedForFairShare(PoolSchedulable sched) {
+    int desiredFairShare = (int) Math.floor(Math.min(
+        sched.getFairShare() / 2, sched.getDemand()));
+    return (sched.getRunningTasks() < desiredFairShare);
   }
 
-  protected int minTasks(JobInProgress job, TaskType type) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return (type == TaskType.MAP) ? info.minMaps : info.minReduces;
+  /**
+   * Check for pools that need tasks preempted, either because they have been
+   * below their guaranteed share for minSharePreemptionTimeout or they
+   * have been below half their fair share for the fairSharePreemptionTimeout.
+   * If such pools exist, compute how many tasks of each type need to be
+   * preempted and then select the right ones using preemptTasks.
+   * 
+   * This method computes and logs the number of tasks we want to preempt even
+   * if preemption is disabled, for debugging purposes.
+   */
+  protected void preemptTasksIfNecessary() {
+    if (!preemptionEnabled)
+      return;
+    
+    long curTime = clock.getTime();
+    if (curTime - lastPreemptCheckTime < preemptionInterval)
+      return;
+    lastPreemptCheckTime = curTime;
+    
+    // Acquire locks on both the JobTracker (task tracker manager) and this
+    // because we might need to call some JobTracker methods (killTask).
+    synchronized (taskTrackerManager) {
+      synchronized (this) {
+        for (TaskType type: MAP_AND_REDUCE) {
+          List<PoolSchedulable> scheds = getPoolSchedulables(type);
+          int tasksToPreempt = 0;
+          for (PoolSchedulable sched: scheds) {
+            tasksToPreempt += tasksToPreempt(sched, curTime);
+          }
+          if (tasksToPreempt > 0) {
+            eventLog.log("SHOULD_PREEMPT", type, tasksToPreempt);
+            if (!onlyLogPreemption) {
+              preemptTasks(scheds, tasksToPreempt);
+            }
+          }
+        }
+      }
+    }
   }
 
-  protected double weight(JobInProgress job, TaskType taskType) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return (taskType == TaskType.MAP ? info.mapWeight : info.reduceWeight);
+  /**
+   * Preempt a given number of tasks from a list of PoolSchedulables. 
+   * The policy for this is to pick tasks from pools that are over their fair 
+   * share, but make sure that no pool is placed below its fair share in the 
+   * process. Furthermore, we want to minimize the amount of computation
+   * wasted by preemption, so out of the tasks in over-scheduled pools, we
+   * prefer to preempt tasks that started most recently.
+   */
+  private void preemptTasks(List<PoolSchedulable> scheds, int tasksToPreempt) {
+    if (scheds.isEmpty() || tasksToPreempt == 0)
+      return;
+    
+    TaskType taskType = scheds.get(0).getTaskType();
+    
+    // Collect running tasks of our type from over-scheduled pools
+    List<TaskStatus> runningTasks = new ArrayList<TaskStatus>();
+    for (PoolSchedulable sched: scheds) {
+      if (sched.getRunningTasks() > sched.getFairShare())
+      for (JobSchedulable js: sched.getJobSchedulables()) {
+        runningTasks.addAll(getRunningTasks(js.getJob(), taskType));
+      }
+    }
+    
+    // Sort tasks into reverse order of start time
+    Collections.sort(runningTasks, new Comparator<TaskStatus>() {
+      public int compare(TaskStatus t1, TaskStatus t2) {
+        if (t1.getStartTime() < t2.getStartTime())
+          return 1;
+        else if (t1.getStartTime() == t2.getStartTime())
+          return 0;
+        else
+          return -1;
+      }
+    });
+    
+    // Maintain a count of tasks left in each pool; this is a bit
+    // faster than calling runningTasks() on the pool repeatedly
+    // because the latter must scan through jobs in the pool
+    HashMap<Pool, Integer> tasksLeft = new HashMap<Pool, Integer>(); 
+    for (Pool p: poolMgr.getPools()) {
+      tasksLeft.put(p, p.getSchedulable(taskType).getRunningTasks());
+    }
+    
+    // Scan down the sorted list of task statuses until we've killed enough
+    // tasks, making sure we don't kill too many from any pool
+    for (TaskStatus status: runningTasks) {
+      JobID jobID = status.getTaskID().getJobID();
+      JobInProgress job = taskTrackerManager.getJob(jobID);
+      Pool pool = poolMgr.getPool(job);
+      PoolSchedulable sched = pool.getSchedulable(taskType);
+      int tasksLeftForPool = tasksLeft.get(pool);
+      if (tasksLeftForPool > sched.getFairShare()) {
+        eventLog.log("PREEMPT", status.getTaskID(),
+            status.getTaskTracker());
+        try {
+          taskTrackerManager.killTask(status.getTaskID(), false);
+          tasksToPreempt--;
+          if (tasksToPreempt == 0)
+            break;
+          
+          // reduce tasks left for pool
+          tasksLeft.put(pool, --tasksLeftForPool);
+        } catch (IOException e) {
+          LOG.error("Failed to kill task " + status.getTaskID(), e);
+        }
+      }
+    }
   }
 
-  protected double deficit(JobInProgress job, TaskType taskType) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return taskType == TaskType.MAP ? info.mapDeficit : info.reduceDeficit;
+  /**
+   * Count how many tasks of a given type the pool needs to preempt, if any.
+   * If the pool has been below its min share for at least its preemption
+   * timeout, it should preempt the difference between its current share and
+   * this min share. If it has been below half its fair share for at least the
+   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
+   * its full fair share. If both conditions hold, we preempt the max of the
+   * two amounts (this shouldn't happen unless someone sets the timeouts to
+   * be identical for some reason).
+   */
+  protected int tasksToPreempt(PoolSchedulable sched, long curTime) {
+    String pool = sched.getName();
+    long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool);
+    long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
+    int tasksDueToMinShare = 0;
+    int tasksDueToFairShare = 0;
+    if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
+      int target = Math.min(sched.getMinShare(), sched.getDemand());
+      tasksDueToMinShare = Math.max(0, target - sched.getRunningTasks());
+    }
+    if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+      int target = (int) Math.min(sched.getFairShare(), sched.getDemand());
+      tasksDueToFairShare = Math.max(0, target - sched.getRunningTasks());
+    }
+    int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
+    if (tasksToPreempt > 0) {
+      String message = "Should preempt " + tasksToPreempt + " " 
+          + sched.getTaskType() + " tasks for pool " + sched.getName() 
+          + ": tasksDueToMinShare = " + tasksDueToMinShare
+          + ", tasksDueToFairShare = " + tasksDueToFairShare;
+      eventLog.log("INFO", message);
+      LOG.info(message);
+    }
+    return tasksToPreempt;
+  }
+
+  private List<TaskStatus> getRunningTasks(JobInProgress job, TaskType type) {
+    // Create a list of all running TaskInProgress'es in the job
+    Set<TaskInProgress> tips = new HashSet<TaskInProgress>();
+    if (type == TaskType.MAP) {
+      // Jobs may have both "non-local maps" which have a split with no locality
+      // info (e.g. the input file is not in HDFS), and maps with locality info,
+      // which are stored in the runningMapCache map from location to task list
+      tips.addAll(job.nonLocalRunningMaps);
+      for (Set<TaskInProgress> set: job.runningMapCache.values()) {
+        tips.addAll(set);
+      }
+    }
+    else {
+      tips.addAll(job.runningReduces);
+    }
+    // Get the active TaskStatus'es for each TaskInProgress (there may be
+    // more than one if the task has multiple copies active due to speculation)
+    List<TaskStatus> statuses = new ArrayList<TaskStatus>();
+    for (TaskInProgress tip: tips) {
+      for (TaskAttemptID id: tip.getActiveTasks().keySet()) {
+        TaskStatus stat = tip.getTaskStatus(id);
+        // status is null when the task has been scheduled but not yet running
+        if (stat != null) {
+          statuses.add(stat);
+        }
+      }
+    }
+    return statuses;
   }
 
   protected boolean isRunnable(JobInProgress job) {
@@ -760,4 +1010,92 @@ public class FairScheduler extends TaskS
     Pool myJobPool = poolMgr.getPool(queueName);
     return myJobPool.getJobs();
   }
+
+  protected void dumpIfNecessary() {
+    long now = clock.getTime();
+    long timeDelta = now - lastDumpTime;
+    if (timeDelta > dumpInterval && eventLog.isEnabled()) {
+      dump();
+      lastDumpTime = now;
+    }
+  }
+
+  /**
+   * Dump scheduler state to the fairscheduler log.
+   */
+  private synchronized void dump() {
+    synchronized (eventLog) {
+      eventLog.log("BEGIN_DUMP");
+      // List jobs in order of submit time
+      ArrayList<JobInProgress> jobs = 
+        new ArrayList<JobInProgress>(infos.keySet());
+      Collections.sort(jobs, new Comparator<JobInProgress>() {
+        public int compare(JobInProgress j1, JobInProgress j2) {
+          return (int) Math.signum(j1.getStartTime() - j2.getStartTime());
+        }
+      });
+      // Dump info for each job
+      for (JobInProgress job: jobs) {
+        JobProfile profile = job.getProfile();
+        JobInfo info = infos.get(job);
+        Schedulable ms = info.mapSchedulable;
+        Schedulable rs = info.reduceSchedulable;
+        eventLog.log("JOB",
+            profile.getJobID(), profile.name, profile.user,
+            job.getPriority(), poolMgr.getPoolName(job),
+            job.numMapTasks, ms.getRunningTasks(),
+            ms.getDemand(), ms.getFairShare(), ms.getWeight(),
+            job.numReduceTasks, rs.getRunningTasks(),
+            rs.getDemand(), rs.getFairShare(), rs.getWeight());
+      }
+      // List pools in alphabetical order
+      List<Pool> pools = new ArrayList<Pool>(poolMgr.getPools());
+      Collections.sort(pools, new Comparator<Pool>() {
+        public int compare(Pool p1, Pool p2) {
+          if (p1.isDefaultPool())
+            return 1;
+          else if (p2.isDefaultPool())
+            return -1;
+          else return p1.getName().compareTo(p2.getName());
+        }});
+      for (Pool pool: pools) {
+        int runningMaps = 0;
+        int runningReduces = 0;
+        for (JobInProgress job: pool.getJobs()) {
+          JobInfo info = infos.get(job);
+          if (info != null) {
+            // TODO: Fix
+            //runningMaps += info.runningMaps;
+            //runningReduces += info.runningReduces;
+          }
+        }
+        String name = pool.getName();
+        eventLog.log("POOL",
+            name, poolMgr.getPoolWeight(name), pool.getJobs().size(),
+            poolMgr.getAllocation(name, TaskType.MAP), runningMaps,
+            poolMgr.getAllocation(name, TaskType.REDUCE), runningReduces);
+      }
+      // Dump info for each pool
+      eventLog.log("END_DUMP");
+    }
+  }
+
+  public Clock getClock() {
+    return clock;
+  }
+  
+  public FairSchedulerEventLog getEventLog() {
+    return eventLog;
+  }
+
+  public JobInfo getJobInfo(JobInProgress job) {
+    return infos.get(job);
+  }
+  
+  boolean isPreemptionEnabled() {
+    return preemptionEnabled;
+  }
+  long getLastPreemptionUpdateTime() {
+    return lastPreemptionUpdateTime;
+  }
 }

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.DailyRollingFileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * Event log used by the fair scheduler for machine-readable debug info.
+ * This class uses a log4j rolling file appender to write the log, but uses
+ * a custom tab-separated event format of the form:
+ * <pre>
+ * DATE    EVENT_TYPE   PARAM_1   PARAM_2   ...
+ * </pre>
+ * Various event types are used by the fair scheduler. The purpose of logging
+ * in this format is to enable tools to parse the history log easily and read
+ * internal scheduler variables, rather than trying to make the log human
+ * readable. The fair scheduler also logs human readable messages in the
+ * JobTracker's main log.
+ * 
+ * Constructing this class creates a disabled log. It must be initialized
+ * using {@link FairSchedulerEventLog#init(Configuration, String)} to begin
+ * writing to the file.
+ */
+class FairSchedulerEventLog {
+  private static final Log LOG = LogFactory.getLog(
+    "org.apache.hadoop.mapred.FairSchedulerEventLog");
+  
+  /** Set to true if logging is disabled due to an error. */
+  private boolean logDisabled = true;
+  
+  /**
+   * Log directory, set by mapred.fairscheduler.eventlog.location in conf file;
+   * defaults to {hadoop.log.dir}/fairscheduler.
+   */
+  private String logDir;
+  
+  /** 
+   * Active log file, which is {LOG_DIR}/hadoop-{user}-fairscheduler.{host}.log.
+   * Older files are also stored as {LOG_FILE}.date (date format YYYY-MM-DD).
+   */ 
+  private String logFile;
+  
+  /** Log4j appender used to write to the log file */
+  private DailyRollingFileAppender appender;
+
+  boolean init(Configuration conf, String jobtrackerHostname) {
+    try {
+      logDir = conf.get("mapred.fairscheduler.eventlog.location",
+          new File(System.getProperty("hadoop.log.dir")).getAbsolutePath()
+          + File.separator + "fairscheduler");
+      Path logDirPath = new Path(logDir);
+      FileSystem fs = logDirPath.getFileSystem(conf);
+      if (!fs.exists(logDirPath)) {
+        if (!fs.mkdirs(logDirPath)) {
+          throw new IOException(
+              "Mkdirs failed to create " + logDirPath.toString());
+        }
+      }
+      String username = System.getProperty("user.name");
+      logFile = String.format("%s%shadoop-%s-fairscheduler-%s.log",
+          logDir, File.separator, username, jobtrackerHostname);
+      logDisabled = false;
+      PatternLayout layout = new PatternLayout("%d{ISO8601}\t%m%n");
+      appender = new DailyRollingFileAppender(layout, logFile, "'.'yyyy-MM-dd");
+      appender.activateOptions();
+      LOG.info("Initialized fair scheduler event log, logging to " + logFile);
+    } catch (IOException e) {
+      LOG.error(
+          "Failed to initialize fair scheduler event log. Disabling it.", e);
+      logDisabled = true;
+    }
+    return !(logDisabled);
+  }
+  
+  /**
+   * Log an event, writing a line in the log file of the form
+   * <pre>
+   * DATE    EVENT_TYPE   PARAM_1   PARAM_2   ...
+   * </pre>
+   */
+  synchronized void log(String eventType, Object... params) {
+    try {
+      if (logDisabled)
+        return;
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(eventType);
+      for (Object param: params) {
+        buffer.append("\t");
+        buffer.append(param);
+      }
+      String message = buffer.toString();
+      Logger logger = Logger.getLogger(getClass());
+      appender.append(new LoggingEvent("", logger, Level.INFO, message, null));
+    } catch (Exception e) {
+      LOG.error("Failed to append to fair scheduler event log", e);
+      logDisabled = true;
+    }
+  }
+  
+  /**
+   * Flush and close the log.
+   */
+  void shutdown() {
+    try {
+      if (appender != null)
+        appender.close();
+    } catch (Exception e) {}
+    logDisabled = true;
+  }
+
+  boolean isEnabled() {
+    return !logDisabled;
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Sun Sep 11 23:57:37 2011
@@ -30,6 +30,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 
 import javax.servlet.ServletContext;
@@ -39,16 +40,15 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.StringUtils;
 
 /**
  * Servlet for displaying fair scheduler information, installed at
  * [job tracker URL]/scheduler when the {@link FairScheduler} is in use.
  * 
- * The main features are viewing each job's task count and fair share, ability
- * to change job priorities and pools from the UI, and ability to switch the
- * scheduler to FIFO mode without restarting the JobTracker if this is required
- * for any reason.
+ * The main features are viewing each job's task count and fair share,
+ * and admin controls to change job priorities and pools from the UI.
  * 
  * There is also an "advanced" view for debugging that can be turned on by
  * going to [job tracker URL]/scheduler?advanced.
@@ -82,13 +82,9 @@ public class FairSchedulerServlet extend
     // If the request has a set* param, handle that and redirect to the regular
     // view page so that the user won't resubmit the data if they hit refresh.
     boolean advancedView = request.getParameter("advanced") != null;
-    if (request.getParameter("setFifo") != null) {
-      scheduler.setUseFifo(request.getParameter("setFifo").equals("true"));
-      response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
-      return;
-    }
-    if (request.getParameter("setPool") != null) {
-      Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+    if (JSPUtil.privateActionsAllowed(jobTracker.conf)
+        && request.getParameter("setPool") != null) {
+      Collection<JobInProgress> runningJobs = getInitedJobs();
       PoolManager poolMgr = null;
       synchronized (scheduler) {
         poolMgr = scheduler.getPoolManager();
@@ -107,8 +103,9 @@ public class FairSchedulerServlet extend
       response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
       return;
     }
-    if (request.getParameter("setPriority") != null) {
-      Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();      
+    if (JSPUtil.privateActionsAllowed(jobTracker.conf)
+        && request.getParameter("setPriority") != null) {
+      Collection<JobInProgress> runningJobs = getInitedJobs();
       JobPriority priority = JobPriority.valueOf(request.getParameter(
           "setPriority"));
       String jobId = request.getParameter("jobid");
@@ -126,22 +123,21 @@ public class FairSchedulerServlet extend
     response.setContentType("text/html");
 
     // Because the client may read arbitrarily slow, and we hold locks while
-    // the servlet output, we want to write to our own buffer which we know
+    // the servlet outputs, we want to write to our own buffer which we know
     // won't block.
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     PrintWriter out = new PrintWriter(baos);
     String hostname = StringUtils.simpleHostname(
         jobTracker.getJobTrackerMachine());
     out.print("<html><head>");
-    out.printf("<title>%s Job Scheduler Admininstration</title>\n", hostname);
+    out.printf("<title>%s Fair Scheduler Administration</title>\n", hostname);
     out.print("<link rel=\"stylesheet\" type=\"text/css\" " + 
         "href=\"/static/hadoop.css\">\n");
     out.print("</head><body>\n");
     out.printf("<h1><a href=\"/jobtracker.jsp\">%s</a> " + 
-        "Job Scheduler Administration</h1>\n", hostname);
+        "Fair Scheduler Administration</h1>\n", hostname);
     showPools(out, advancedView);
     showJobs(out, advancedView);
-    showAdminForm(out, advancedView);
     out.print("</body></html>\n");
     out.close();
 
@@ -156,12 +152,17 @@ public class FairSchedulerServlet extend
    */
   private void showPools(PrintWriter out, boolean advancedView) {
     synchronized(scheduler) {
+      boolean warnInverted = false;
       PoolManager poolManager = scheduler.getPoolManager();
       out.print("<h2>Pools</h2>\n");
       out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
-      out.print("<tr><th>Pool</th><th>Running Jobs</th>" + 
-          "<th>Min Maps</th><th>Min Reduces</th>" + 
-          "<th>Running Maps</th><th>Running Reduces</th></tr>\n");
+      out.print("<tr><th rowspan=2>Pool</th>" +
+          "<th rowspan=2>Running Jobs</th>" + 
+          "<th colspan=4>Map Tasks</th>" + 
+          "<th colspan=4>Reduce Tasks</th>" +
+          "<th rowspan=2>Scheduling Mode</th></tr>\n<tr>" + 
+          "<th>Min Share</th><th>Max Share</th><th>Running</th><th>Fair Share</th>" + 
+          "<th>Min Share</th><th>Max Share</th><th>Running</th><th>Fair Share</th></tr>\n");
       List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
       Collections.sort(pools, new Comparator<Pool>() {
         public int compare(Pool p1, Pool p2) {
@@ -172,27 +173,54 @@ public class FairSchedulerServlet extend
           else return p1.getName().compareTo(p2.getName());
         }});
       for (Pool pool: pools) {
-        int runningMaps = 0;
-        int runningReduces = 0;
-        for (JobInProgress job: pool.getJobs()) {
-          JobInfo info = scheduler.infos.get(job);
-          if (info != null) {
-            runningMaps += info.runningMaps;
-            runningReduces += info.runningReduces;
-          }
-        }
-        out.print("<tr>\n");
-        out.printf("<td>%s</td>\n", pool.getName());
-        out.printf("<td>%s</td>\n", pool.getJobs().size());
-        out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+        String name = pool.getName();
+        int runningMaps = pool.getMapSchedulable().getRunningTasks();
+        int runningReduces = pool.getReduceSchedulable().getRunningTasks();
+        int maxMaps = poolManager.getMaxSlots(name, TaskType.MAP);
+        int maxReduces = poolManager.getMaxSlots(name, TaskType.REDUCE);
+        boolean invertedMaps = poolManager.invertedMinMax(TaskType.MAP, name);
+        boolean invertedReduces = poolManager.invertedMinMax(TaskType.REDUCE, name);
+        warnInverted = warnInverted || invertedMaps || invertedReduces;
+        out.print("<tr>");
+        out.printf("<td>%s</td>", name);
+        out.printf("<td>%d</td>", pool.getJobs().size());
+        // Map Tasks
+        out.printf("<td>%d</td>", poolManager.getAllocation(name,
             TaskType.MAP));
-        out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(), 
+        out.print("<td>");
+        if(maxMaps == Integer.MAX_VALUE) {
+          out.print("-");
+        } else {
+          out.print(maxMaps);
+        }
+        if(invertedMaps) {
+          out.print("*");
+        }
+        out.print("</td>");
+        out.printf("<td>%d</td>", runningMaps);
+        out.printf("<td>%.1f</td>", pool.getMapSchedulable().getFairShare());
+        // Reduce Tasks
+        out.printf("<td>%d</td>", poolManager.getAllocation(name,
             TaskType.REDUCE));
-        out.printf("<td>%s</td>\n", runningMaps);
-        out.printf("<td>%s</td>\n", runningReduces);
+        out.print("<td>");
+        if(maxReduces == Integer.MAX_VALUE) {
+          out.print("-");
+        } else {
+          out.print(maxReduces);
+        }
+        if(invertedReduces) {
+          out.print("*");
+        }
+        out.print("</td>");
+        out.printf("<td>%d</td>", runningReduces);
+        out.printf("<td>%.1f</td>", pool.getReduceSchedulable().getFairShare());
+        out.printf("<td>%s</td>", pool.getSchedulingMode());
         out.print("</tr>\n");
       }
       out.print("</table>\n");
+      if(warnInverted) {
+        out.print("<p>* One or more pools have max share set lower than min share. Max share will be used and minimum will be treated as if set equal to max.</p>");
+      }
     }
   }
 
@@ -202,66 +230,70 @@ public class FairSchedulerServlet extend
   private void showJobs(PrintWriter out, boolean advancedView) {
     out.print("<h2>Running Jobs</h2>\n");
     out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
-    int colsPerTaskType = advancedView ? 6 : 3;
+    int colsPerTaskType = advancedView ? 4 : 3;
     out.printf("<tr><th rowspan=2>Submitted</th>" + 
         "<th rowspan=2>JobID</th>" +
         "<th rowspan=2>User</th>" +
         "<th rowspan=2>Name</th>" +
         "<th rowspan=2>Pool</th>" +
         "<th rowspan=2>Priority</th>" +
-        "<th colspan=%d>Maps</th>" +
-        "<th colspan=%d>Reduces</th>",
+        "<th colspan=%d>Map Tasks</th>" +
+        "<th colspan=%d>Reduce Tasks</th>",
         colsPerTaskType, colsPerTaskType);
     out.print("</tr><tr>\n");
     out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
-        (advancedView ? "<th>Weight</th><th>Deficit</th><th>minMaps</th>" : ""));
+        (advancedView ? "<th>Weight</th>" : ""));
     out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
-        (advancedView ? "<th>Weight</th><th>Deficit</th><th>minReduces</th>" : ""));
+        (advancedView ? "<th>Weight</th>" : ""));
     out.print("</tr>\n");
     synchronized (jobTracker) {
-      Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+      Collection<JobInProgress> runningJobs = getInitedJobs();
       synchronized (scheduler) {
         for (JobInProgress job: runningJobs) {
           JobProfile profile = job.getProfile();
           JobInfo info = scheduler.infos.get(job);
           if (info == null) { // Job finished, but let's show 0's for info
-            info = new JobInfo();
+            info = new JobInfo(null, null);
           }
           out.print("<tr>\n");
           out.printf("<td>%s</td>\n", DATE_FORMAT.format(
-                       new Date(job.getStartTime())));
+              new Date(job.getStartTime())));
           out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
-                     profile.getJobID(), profile.getJobID());
+              profile.getJobID(), profile.getJobID());
           out.printf("<td>%s</td>\n", profile.getUser());
           out.printf("<td>%s</td>\n", profile.getJobName());
-          out.printf("<td>%s</td>\n", generateSelect(
-                       scheduler.getPoolManager().getPoolNames(),
-                       scheduler.getPoolManager().getPoolName(job),
-                       "/scheduler?setPool=<CHOICE>&jobid=" + profile.getJobID() +
-                       (advancedView ? "&advanced" : "")));
-          out.printf("<td>%s</td>\n", generateSelect(
-                       Arrays.asList(new String[]
-                         {"VERY_LOW", "LOW", "NORMAL", "HIGH", "VERY_HIGH"}),
-                       job.getPriority().toString(),
-                       "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID() +
-                       (advancedView ? "&advanced" : "")));
-          out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
-                     job.finishedMaps(), job.desiredMaps(), info.runningMaps,
-                     info.mapFairShare);
+          if (JSPUtil.privateActionsAllowed(jobTracker.conf)) {
+            out.printf("<td>%s</td>\n", generateSelect(scheduler
+                .getPoolManager().getPoolNames(), scheduler.getPoolManager()
+                .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="
+                + profile.getJobID() + (advancedView ? "&advanced" : "")));
+            out.printf("<td>%s</td>\n", generateSelect(Arrays
+                .asList(new String[] { "VERY_LOW", "LOW", "NORMAL", "HIGH",
+                    "VERY_HIGH" }), job.getPriority().toString(),
+                "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID()
+                    + (advancedView ? "&advanced" : "")));
+          } else {
+            out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
+            out.printf("<td>%s</td>\n", job.getPriority().toString());
+          }
+          Pool pool = scheduler.getPoolManager().getPool(job);
+          String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+              String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
+          out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+              job.finishedMaps(), job.desiredMaps(), 
+              info.mapSchedulable.getRunningTasks(),
+              mapShare);
           if (advancedView) {
-            out.printf("<td>%8.1f</td>\n", info.mapWeight);
-            out.printf("<td>%s</td>\n", info.neededMaps > 0 ?
-                       (info.mapDeficit / 1000) + "s" : "--");
-            out.printf("<td>%d</td>\n", info.minMaps);
+            out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
           }
-          out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
-                     job.finishedReduces(), job.desiredReduces(), info.runningReduces,
-                     info.reduceFairShare);
+          String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+              String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
+          out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+              job.finishedReduces(), job.desiredReduces(), 
+              info.reduceSchedulable.getRunningTasks(),
+              reduceShare);
           if (advancedView) {
-            out.printf("<td>%8.1f</td>\n", info.reduceWeight);
-            out.printf("<td>%s</td>\n", info.neededReduces > 0 ?
-                       (info.reduceDeficit / 1000) + "s" : "--");
-            out.printf("<td>%d</td>\n", info.minReduces);
+            out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
           }
           out.print("</tr>\n");
         }
@@ -294,22 +326,17 @@ public class FairSchedulerServlet extend
   }
 
   /**
-   * Print the administration form at the bottom of the page, which currently
-   * only includes the button for switching between FIFO and Fair Scheduling.
+   * Obtained all initialized jobs
    */
-  private void showAdminForm(PrintWriter out, boolean advancedView) {
-    out.print("<h2>Scheduling Mode</h2>\n");
-    String curMode = scheduler.getUseFifo() ? "FIFO" : "Fair Sharing";
-    String otherMode = scheduler.getUseFifo() ? "Fair Sharing" : "FIFO";
-    String advParam = advancedView ? "?advanced" : "";
-    out.printf("<form method=\"post\" action=\"/scheduler%s\">\n", advParam);
-    out.printf("<p>The scheduler is currently using <b>%s mode</b>. " +
-        "<input type=\"submit\" value=\"Switch to %s mode.\" " + 
-        "onclick=\"return confirm('Are you sure you want to change " +
-        "scheduling mode to %s?')\" />\n",
-        curMode, otherMode, otherMode);
-    out.printf("<input type=\"hidden\" name=\"setFifo\" value=\"%s\" />",
-        !scheduler.getUseFifo());
-    out.print("</form>\n");
+  private Collection<JobInProgress> getInitedJobs() {
+    Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+    for (Iterator<JobInProgress> it = runningJobs.iterator(); it.hasNext();) {
+      JobInProgress job = it.next();
+      if (!job.inited()) {
+        it.remove();
+      }
+    }
+    return runningJobs;
   }
+
 }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java Sun Sep 11 23:57:37 2011
@@ -35,7 +35,8 @@ public class FifoJobComparator implement
       }
     }
     if (res == 0) {
-      res = j1.hashCode() - j2.hashCode();
+      // If there is a tie, break it by job ID to get a deterministic order
+      res = j1.getJobID().compareTo(j2.getJobID());
     }
     return res;
   }