You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/29 05:50:07 UTC

svn commit: r690093 [1/2] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/fairscheduler/ src/contrib/fairscheduler/src/ src/contrib/fairscheduler/src/java/ src/contrib/fairscheduler/src/java/org/ src/contrib/fairscheduler/src/java/org/apache/ src/...

Author: omalley
Date: Thu Aug 28 20:50:06 2008
New Revision: 690093

URL: http://svn.apache.org/viewvc?rev=690093&view=rev
Log:
HADOOP-3746. Add a fair share scheduler. (Matei Zaharia via omalley)

Added:
    hadoop/core/trunk/src/contrib/fairscheduler/
    hadoop/core/trunk/src/contrib/fairscheduler/README
    hadoop/core/trunk/src/contrib/fairscheduler/build.xml
    hadoop/core/trunk/src/contrib/fairscheduler/src/
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskType.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/build.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=690093&r1=690092&r2=690093&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Aug 28 20:50:06 2008
@@ -104,6 +104,8 @@
     HADOOP-3759. Provides ability to run memory intensive jobs without 
     affecting other running tasks on the nodes. (Hemanth Yamijala via ddas)
 
+    HADOOP-3746. Add a fair share scheduler. (Matei Zaharia via omalley)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

Modified: hadoop/core/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build.xml?rev=690093&r1=690092&r2=690093&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/build.xml (original)
+++ hadoop/core/trunk/src/contrib/build.xml Thu Aug 28 20:50:06 2008
@@ -47,6 +47,7 @@
   <target name="test">
     <subant target="test">
       <fileset dir="." includes="streaming/build.xml"/>
+      <fileset dir="." includes="fairscheduler/build.xml"/>
     </subant>
   </target>
   

Added: hadoop/core/trunk/src/contrib/fairscheduler/README
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/README?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/README (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/README Thu Aug 28 20:50:06 2008
@@ -0,0 +1,238 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License.  You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.  See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements fair scheduling for MapReduce jobs.
+
+Fair scheduling is a method of assigning resources to jobs such that all jobs
+get, on average, an equal share of resources over time. When there is a single
+job running, that job uses the entire cluster. When other jobs are submitted,
+tasks slots that free up are assigned to the new jobs, so that each job gets
+roughly the same amount of CPU time. Unlike the default Hadoop scheduler, which
+forms a queue of jobs, this lets short jobs finish in reasonable time while not
+starving long jobs. It is also a reasonable way to share a cluster between a
+number of users. Finally, fair sharing can also work with job priorities - the
+priorities are used as weights to determine the fraction of total compute time
+that each job should get.
+
+In addition to providing fair sharing, the Fair Scheduler allows assigning
+jobs to "pools" with guaranteed minimum shares. When a pool contains jobs,
+it gets at least its minimum share, but when a pool does not need its full
+capacity, the excess is shared between other running jobs. Thus pools are
+a way to guarantee capacity for particular user groups while utilizing the
+cluster efficiently when these users are not submitting any jobs. Within each
+pool, fair sharing is used to share capacity between the running jobs. By
+default the pool is set based on the queue.name property in the jobconf which
+will be introduced with the Hadoop Resource Manager (JIRA 3445), but it's
+possible to also have a pool per user or per Unix user group.
+
+The fair scheduler lets all jobs run by default, but it is also possible to
+limit the number of running jobs per user and per pool through the config
+file. This can be useful when a user must submit hundreds of jobs at once,
+or in general to improve performance if running too many jobs at once would
+cause too much intermediate data to be created or too much context-switching.
+Limiting the jobs does not cause any subsequently submitted jobs to fail, only
+to wait in the sheduler's queue until some of the user's earlier jobs finish.
+Jobs to run from each user/pool are chosen in order of priority and then submit
+time, as in the default FIFO scheduler in Hadoop.
+
+Finally, the fair scheduler provides several extension points where the basic
+functionality can be extended. For example, the weight calculation can be
+modified to give a priority boost to new jobs, implementing a "shortest job
+first" like policy which will reduce response times for interactive jobs even
+further. 
+
+--------------------------------------------------------------------------------
+
+BUILDING:
+
+In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
+
+--------------------------------------------------------------------------------
+
+INSTALLING:
+
+To run the fair scheduler in your Hadoop installation, you need to put it on
+the CLASSPATH. The easiest way is to copy the hadoop-*-fairscheduler.jar
+from HADOOP_HOME/build/contrib/fairscheduler to HADOOP_HOME/lib. Alternatively
+you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
+
+You will also need to set the following property in the Hadoop config file
+(conf/hadoop-site.xml) to have Hadoop use the fair scheduler:
+
+<property>
+  <name>mapred.jobtracker.taskScheduler</name>
+  <value>org.apache.hadoop.mapred.FairScheduler</value>
+</property>
+
+Once you restart the cluster, you can check that the fair scheduler is running
+by going to http://<jobtracker URL>/scheduler on the JobTracker's web UI. A
+"job scheduler administration" page should be visible there. This page is
+described in the Administration section.
+
+--------------------------------------------------------------------------------
+
+CONFIGURING:
+
+The following properties can be set in hadoop-site.xml to configure the
+scheduler:
+
+mapred.fairscheduler.allocation.file:
+    Specifies an absolute path to an XML file which contains the allocations
+    for each pool, as well as the per-pool and per-user limits on number of
+    running jobs. If this property is not provided, allocations are not used.
+    This file must be in XML format, and can contain three types of elements:
+    - pool elements, which may contain elements for minMaps, minReduces and
+      maxRunningJobs (limit the number of jobs from the pool to run at once).
+    - user elements, which may contain a maxRunningJobs to limit jobs.
+    - A userMaxJobsDefault element, which sets the running job limit for any
+      users that do not have their own elements.
+    The following example file shows how to create each type of element:
+        <?xml version="1.0"?>
+        <allocations>
+          <pool name="sample_pool">
+            <minMaps>5</minMaps>
+            <minReduces>5</minReduces>
+          </pool>
+          <user name="sample_user">
+            <maxRunningJobs>6</maxRunningJobs>
+          </user>
+          <userMaxJobsDefault>3</userMaxJobsDefault>
+        </allocations>
+    This example creates a pool sample_pool with a guarantee of 5 map slots
+    and 5 reduce slots. It also limits the number of running jobs per user
+    to 3, except for sample_user, who can run 6 jobs concurrently.
+    Any pool not defined in the allocations file will have no guaranteed
+    capacity. Also, any pool or user with no max running jobs set in the file
+    will be allowed to run an unlimited number of jobs.
+
+mapred.fairscheduler.assignmultiple:
+    Allows the scheduler to assign both a map task and a reduce task on each
+    heartbeat, which improves cluster throughput when there are many small
+    tasks to run. Boolean value, default: false.
+
+mapred.fairscheduler.sizebasedweight:
+    Take into account job sizes in calculating their weights for fair sharing.
+    By default, weights are only based on job priorities. Setting this flag to
+    true will make them based on the size of the job (number of tasks needed)
+    as well, though not linearly (the weight will be proportional to the log
+    of the number of tasks needed). This lets larger jobs get larger fair
+    shares while still providing enough of a share to small jobs to let them
+    finish fast. Boolean value, default: false.
+
+mapred.fairscheduler.poolnameproperty:
+    Specify which jobconf property is used to determine the pool that a job
+    belongs in. String, default: queue.name (the same property as the queue
+    name in the Hadoop Resource Manager, JIRA 3445). You can use user.name
+    or group.name to base it on the Unix user or Unix group for example.
+
+mapred.fairscheduler.weightadjuster:
+    An extensibility point that lets you specify a class to adjust the weights
+    of running jobs. This class should implement the WeightAdjuster interface.
+    There is currently one example implementation - NewJobWeightBooster, which
+    increases the weight of jobs for the first 5 minutes of their lifetime
+    to let short jobs finish faster. To use it, set the weightadjuster property
+    to the full class name, org.apache.hadoop.mapred.NewJobWeightBooster.
+    NewJobWeightBooster itself provides two parameters for setting the duration
+    and boost factor - mapred.newjobweightbooster.factor (default 3) and
+    mapred.newjobweightbooster.duration (in milliseconds, default 300000 for 5
+    minutes).
+
+mapred.fairscheduler.loadmanager:
+    An extensibility point that lets you specify a class that determines
+    how many maps and reduces can run on a given TaskTracker. This class should
+    implement the LoadManager interface. By default the task caps in the Hadoop
+    config file are used, but this option could be used to make the load based
+    on available memory and CPU utilization for example.
+
+mapred.fairscheduler.taskselector:
+    An extensibility point that lets you specify a class that determines
+    which task from within a job to launch on a given tracker. This can be
+    used to change either the locality policy (e.g. keep some jobs within
+    a particular rack) or the speculative execution algorithm (select when to
+    launch speculative tasks). The default implementation uses Hadoop's
+    default algorithms from JobInProgress. 
+
+--------------------------------------------------------------------------------
+
+ADMINISTRATION:
+
+The fair scheduler provides support for administration at runtime through
+two mechanisms. First, it is possible to modify pools' allocations and user
+and pool running job limits at runtime by editing the allocation config file.
+The scheduler will reload this file 10-15 seconds after it sees that it was
+modified. Second, current jobs, pools, and fair shares can be examined through
+the JobTracker's web interface, at http://<jobtracker URL>/scheduler. On this
+interface, it is also possible to modify jobs' priorities or move jobs from
+one pool to another and see the effects on the fair shares (this requires
+JavaScript). The following fields can be seen for each job on the web interface:
+
+Submitted - Date and time job was submitted.
+JobID, User, Name - Job identifiers as on the standard web UI.
+Pool - Current pool of job. Select another value to move job to another pool.
+Priority - Current priority. Select another value to change the job's priority.
+Maps/Reduces Finished: Number of tasks finished / total tasks.
+Maps/Reduces Running: Tasks currently running.
+Map/Reduce Fair Share: The average number of task slots that this job should
+    have at any given time according to fair sharing. The actual number of
+    tasks will go up and down depending on how much compute time the job has
+    had, but on average it will get its fair share amount.
+
+In addition, it is possible to turn on an "advanced" view for the web UI, by
+going to http://<jobtracker URL>/scheduler?advanced. This view shows four more
+columns used for calculations internally:
+
+Maps/Reduce Weight: Weight of the job in the fair sharing calculations. This
+    depends on priority and potentially also on job size and job age if the
+    sizebasedweight and NewJobWeightBooster are enabled.
+Map/Reduce Deficit: The job's scheduling deficit in macine-seconds - the amount
+    of resources it should have gotten according to its fair share, minus how
+    many it actually got. Positive deficit means the job will be scheduled
+    again in the near future because it needs to catch up to its fair share.
+    The scheduler schedules jobs with higher deficit ahead of others. Please
+    see the Implementation section of this document for details.
+
+Finally, the web interface provides a button for switching to FIFO scheduling,
+at runtime, at the bottom of the page, in case this becomes necessary and it
+is inconvenient to restart the MapReduce cluster. 
+
+--------------------------------------------------------------------------------
+
+IMPLEMENTATION:
+
+There are two aspects to implementing fair scheduling: Calculating each job's
+fair share, and choosing which job to run when a task slot becomes available.
+
+To select jobs to run, the scheduler then keeps track of a "deficit" for
+each job - the difference between the amount of compute time it should have
+gotten on an ideal scheduler, and the amount of compute time it actually got.
+This is a measure of how "unfair" we've been to the job. Every few hundred
+milliseconds, the scheduler updates the deficit of each job by looking at
+how many tasks each job had running during this interval vs. its fair share.
+Whenever a task slot becomes available, it is assigned to the job with the
+highest deficit. There is one exception - if there were one or more jobs who
+were not meeting their pool capacity guarantees, we only choose among these
+"needy" jobs (based again on their deficit), to ensure that the scheduler
+meets pool guarantees as soon as possible.
+
+The fair shares are calculated by dividing the capacity of the cluster among
+runnable jobs according to a "weight" for each job. By default the weight is
+based on priority, with each level of priority having 2x higher weight than the
+next (for example, VERY_HIGH has 4x the weight of NORMAL). However, weights can
+also be based on job sizes and ages, as described in the Configuring section.
+For jobs that are in a pool, fair shares also take into account the minimum
+guarantee for that pool. This capacity is divided among the jobs in that pool
+according again to their weights.
+
+Finally, when limits on a user's running jobs or a pool's running jobs are in
+place, we choose which jobs get to run by sorting all jobs in order of priority
+and then submit time, as in the standard Hadoop scheduler. Any jobs that fall
+after the user/pool's limit in this ordering are queued up and wait idle until
+they can be run. During this time, they are ignored from the fair sharing
+calculations and do not gain or lose deficit (their fair share is set to zero).

Added: hadoop/core/trunk/src/contrib/fairscheduler/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/build.xml?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/build.xml (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/build.xml Thu Aug 28 20:50:06 2008
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="fairscheduler" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+</project>

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Thrown when the allocation file for {@link PoolManager} is malformed.  
+ */
+public class AllocationConfigurationException extends Exception {
+  private static final long serialVersionUID = 4046517047810854249L;
+  
+  public AllocationConfigurationException(String message) {
+    super(message);
+  }
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A {@link LoadManager} for use by the {@link FairScheduler} that allocates
+ * tasks evenly across nodes up to their per-node maximum, using the default
+ * load management algorithm in Hadoop.
+ */
+public class CapBasedLoadManager extends LoadManager {
+  /**
+   * Determine how many tasks of a given type we want to run on a TaskTracker. 
+   * This cap is chosen based on how many tasks of that type are outstanding in
+   * total, so that when the cluster is used below capacity, tasks are spread
+   * out uniformly across the nodes rather than being clumped up on whichever
+   * machines sent out heartbeats earliest.
+   */
+  int getCap(TaskTrackerStatus tracker,
+      int totalRunnableTasks, int localMaxTasks) {
+    int numTaskTrackers = taskTrackerManager.taskTrackers().size();
+    return Math.min(localMaxTasks,
+        (int) Math.ceil((double) totalRunnableTasks / numTaskTrackers));
+  }
+
+  @Override
+  public boolean canAssignMap(TaskTrackerStatus tracker,
+      int totalRunnableMaps) {
+    return tracker.countMapTasks() < getCap(tracker, totalRunnableMaps,
+        tracker.getMaxMapTasks());
+  }
+
+  @Override
+  public boolean canAssignReduce(TaskTrackerStatus tracker,
+      int totalRunnableReduces) {
+    return tracker.countReduceTasks() < getCap(tracker, totalRunnableReduces,
+        tracker.getMaxReduceTasks());
+  }
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * A {@link TaskSelector} implementation that wraps around the default
+ * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int)} and
+ * {@link JobInProgress#obtainNewReduceTask(TaskTrackerStatus, int)} methods
+ * in {@link JobInProgress}, using the default Hadoop locality and speculative
+ * threshold algorithms.
+ */
+public class DefaultTaskSelector extends TaskSelector {
+
+  @Override
+  public int neededSpeculativeMaps(JobInProgress job) {
+    int count = 0;
+    long time = System.currentTimeMillis();
+    double avgProgress = job.getStatus().mapProgress();
+    for (TaskInProgress tip: job.maps) {
+      if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public int neededSpeculativeReduces(JobInProgress job) {
+    int count = 0;
+    long time = System.currentTimeMillis();
+    double avgProgress = job.getStatus().reduceProgress();
+    for (TaskInProgress tip: job.reduces) {
+      if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress job)
+      throws IOException {
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    int numTaskTrackers = clusterStatus.getTaskTrackers();
+    return job.obtainNewMapTask(taskTracker, numTaskTrackers,
+        taskTrackerManager.getNumberOfUniqueHosts());
+  }
+
+  @Override
+  public Task obtainNewReduceTask(TaskTrackerStatus taskTracker, JobInProgress job)
+      throws IOException {
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    int numTaskTrackers = clusterStatus.getTaskTrackers();
+    return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+        taskTrackerManager.getNumberOfUniqueHosts());
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,689 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A {@link TaskScheduler} that implements fair sharing.
+ */
+public class FairScheduler extends TaskScheduler {
+  /** How often fair shares are re-calculated */
+  public static final long UPDATE_INTERVAL = 500;
+  public static final Log LOG = LogFactory.getLog(
+      "org.apache.hadoop.mapred.FairScheduler");
+  
+  protected PoolManager poolMgr;
+  
+  protected LoadManager loadMgr;
+  protected TaskSelector taskSelector;
+  protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
+  protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
+    new HashMap<JobInProgress, JobInfo>();
+  protected long lastUpdateTime;           // Time when we last updated infos
+  protected boolean initialized;  // Are we initialized?
+  protected boolean running;      // Are we running?
+  protected boolean useFifo;      // Set if we want to revert to FIFO behavior
+  protected boolean assignMultiple; // Simultaneously assign map and reduce?
+  protected boolean sizeBasedWeight; // Give larger weights to larger jobs
+  private Clock clock;
+  private boolean runBackgroundUpdates; // Can be set to false for testing
+  private EagerTaskInitializationListener eagerInitListener;
+  private JobListener jobListener;
+  
+  /**
+   * A class for holding per-job scheduler variables. These always contain the
+   * values of the variables at the last update(), and are used along with a
+   * time delta to update the map and reduce deficits before a new update().
+   */
+  static class JobInfo {
+    boolean runnable = false;   // Can the job run given user/pool limits?
+    double mapWeight = 0;       // Weight of job in calculation of map share
+    double reduceWeight = 0;    // Weight of job in calculation of reduce share
+    long mapDeficit = 0;        // Time deficit for maps
+    long reduceDeficit = 0;     // Time deficit for reduces
+    int runningMaps = 0;        // Maps running at last update
+    int runningReduces = 0;     // Reduces running at last update
+    int neededMaps;             // Maps needed at last update
+    int neededReduces;          // Reduces needed at last update
+    int minMaps = 0;            // Minimum maps as guaranteed by pool
+    int minReduces = 0;         // Minimum reduces as guaranteed by pool
+    double mapFairShare = 0;    // Fair share of map slots at last update
+    double reduceFairShare = 0; // Fair share of reduce slots at last update
+  }
+  
+  /**
+   * A clock class - can be mocked out for testing.
+   */
+  static class Clock {
+    long getTime() {
+      return System.currentTimeMillis();
+    }
+  }
+  
+  public FairScheduler() {
+    this(new Clock(), true);
+  }
+  
+  /**
+   * Constructor used for tests, which can change the clock and disable updates.
+   */
+  protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
+    this.clock = clock;
+    this.runBackgroundUpdates = runBackgroundUpdates;
+    this.eagerInitListener = new EagerTaskInitializationListener();
+    this.jobListener = new JobListener();
+  }
+
+  @Override
+  public void start() {
+    try {
+      Configuration conf = getConf();
+      eagerInitListener.start();
+      taskTrackerManager.addJobInProgressListener(eagerInitListener);
+      taskTrackerManager.addJobInProgressListener(jobListener);
+      poolMgr = new PoolManager(conf);
+      loadMgr = (LoadManager) ReflectionUtils.newInstance(
+          conf.getClass("mapred.fairscheduler.loadmanager", 
+              CapBasedLoadManager.class, LoadManager.class), conf);
+      loadMgr.setTaskTrackerManager(taskTrackerManager);
+      loadMgr.start();
+      taskSelector = (TaskSelector) ReflectionUtils.newInstance(
+          conf.getClass("mapred.fairscheduler.taskselector", 
+              DefaultTaskSelector.class, TaskSelector.class), conf);
+      taskSelector.setTaskTrackerManager(taskTrackerManager);
+      taskSelector.start();
+      Class<?> weightAdjClass = conf.getClass(
+          "mapred.fairscheduler.weightadjuster", null);
+      if (weightAdjClass != null) {
+        weightAdjuster = (WeightAdjuster) ReflectionUtils.newInstance(
+            weightAdjClass, conf);
+      }
+      assignMultiple = conf.getBoolean("mapred.fairscheduler.assignmultiple",
+          false);
+      sizeBasedWeight = conf.getBoolean("mapred.fairscheduler.sizebasedweight",
+          false);
+      initialized = true;
+      running = true;
+      lastUpdateTime = clock.getTime();
+      // Start a thread to update deficits every UPDATE_INTERVAL
+      if (runBackgroundUpdates)
+        new UpdateThread().start();
+      // Register servlet with JobTracker's Jetty server
+      if (taskTrackerManager instanceof JobTracker) {
+        JobTracker jobTracker = (JobTracker) taskTrackerManager;
+        StatusHttpServer infoServer = jobTracker.infoServer;
+        infoServer.setAttribute("scheduler", this);
+        infoServer.addServlet("scheduler", "/scheduler",
+            FairSchedulerServlet.class);
+      }
+    } catch (Exception e) {
+      // Can't load one of the managers - crash the JobTracker now while it is
+      // starting up so that the user notices.
+      throw new RuntimeException("Failed to start FairScheduler", e);
+    }
+    LOG.info("Successfully configured FairScheduler");
+  }
+
+  @Override
+  public void terminate() throws IOException {
+    running = false;
+    if (jobListener != null)
+      taskTrackerManager.removeJobInProgressListener(jobListener);
+    if (eagerInitListener != null)
+      taskTrackerManager.removeJobInProgressListener(eagerInitListener);
+  }
+  
+  /**
+   * Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
+   */
+  private class JobListener extends JobInProgressListener {
+    @Override
+    public void jobAdded(JobInProgress job) {
+      synchronized (FairScheduler.this) {
+        poolMgr.addJob(job);
+        JobInfo info = new JobInfo();
+        infos.put(job, info);
+        update();
+      }
+    }
+    
+    @Override
+    public void jobRemoved(JobInProgress job) {
+      synchronized (FairScheduler.this) {
+        poolMgr.removeJob(job);
+        infos.remove(job);
+      }
+    }
+  
+    @Override
+    public void jobUpdated(JobInProgress job) {
+    }
+  }
+
+  /**
+   * A thread which calls {@link FairScheduler#update()} ever
+   * <code>UPDATE_INTERVAL</code> milliseconds.
+   */
+  private class UpdateThread extends Thread {
+    private UpdateThread() {
+      super("FairScheduler update thread");
+    }
+
+    public void run() {
+      while (running) {
+        try {
+          Thread.sleep(UPDATE_INTERVAL);
+          update();
+        } catch (Exception e) {
+          LOG.error("Failed to update fair share calculations", e);
+        }
+      }
+    }
+  }
+  
+  @Override
+  public synchronized List<Task> assignTasks(TaskTrackerStatus tracker)
+      throws IOException {
+    if (!initialized) // Don't try to assign tasks if we haven't yet started up
+      return null;
+    
+    // Reload allocations file if it hasn't been loaded in a while
+    poolMgr.reloadAllocsIfNecessary();
+    
+    // Compute total runnable maps and reduces
+    int runnableMaps = 0;
+    int runnableReduces = 0;
+    for (JobInProgress job: infos.keySet()) {
+      runnableMaps += runnableTasks(job, TaskType.MAP);
+      runnableReduces += runnableTasks(job, TaskType.REDUCE);
+    }
+    
+    // Scan to see whether any job needs to run a map, then a reduce
+    ArrayList<Task> tasks = new ArrayList<Task>();
+    TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
+    for (TaskType taskType: types) {
+      boolean canAssign = (taskType == TaskType.MAP) ? 
+          loadMgr.canAssignMap(tracker, runnableMaps) :
+          loadMgr.canAssignReduce(tracker, runnableReduces);
+      if (canAssign) {
+        // Figure out the jobs that need this type of task
+        List<JobInProgress> candidates = new ArrayList<JobInProgress>();
+        for (JobInProgress job: infos.keySet()) {
+          if (job.getStatus().getRunState() == JobStatus.RUNNING && 
+              neededTasks(job, taskType) > 0) {
+            candidates.add(job);
+          }
+        }
+        // Sort jobs by deficit (for Fair Sharing) or submit time (for FIFO)
+        Comparator<JobInProgress> comparator = useFifo ?
+            new FifoJobComparator() : new DeficitComparator(taskType);
+        Collections.sort(candidates, comparator);
+        for (JobInProgress job: candidates) {
+          Task task = (taskType == TaskType.MAP ? 
+              taskSelector.obtainNewMapTask(tracker, job) :
+              taskSelector.obtainNewReduceTask(tracker, job));
+          if (task != null) {
+            // Update the JobInfo for this job so we account for the launched
+            // tasks during this update interval and don't try to launch more
+            // tasks than the job needed on future heartbeats
+            JobInfo info = infos.get(job);
+            if (taskType == TaskType.MAP) {
+              info.runningMaps++;
+              info.neededMaps--;
+            } else {
+              info.runningReduces++;
+              info.neededReduces--;
+            }
+            tasks.add(task);
+            if (!assignMultiple)
+              return tasks;
+            break;
+          }
+        }
+      }
+    }
+    
+    // If no tasks were found, return null
+    return tasks.isEmpty() ? null : tasks;
+  }
+
+  /**
+   * Compare jobs by deficit for a given task type, putting jobs whose current
+   * allocation is less than their minimum share always ahead of others. This is
+   * the default job comparator used for Fair Sharing.
+   */
+  private class DeficitComparator implements Comparator<JobInProgress> {
+    private final TaskType taskType;
+
+    private DeficitComparator(TaskType taskType) {
+      this.taskType = taskType;
+    }
+
+    public int compare(JobInProgress j1, JobInProgress j2) {
+      // Put needy jobs ahead of non-needy jobs (where needy means must receive
+      // new tasks to meet slot minimum), comparing among jobs of the same type
+      // by deficit so as to put jobs with higher deficit ahead.
+      JobInfo j1Info = infos.get(j1);
+      JobInfo j2Info = infos.get(j2);
+      long deficitDif;
+      boolean j1Needy, j2Needy;
+      if (taskType == TaskType.MAP) {
+        j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
+        j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
+        deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
+      } else {
+        j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
+        j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);
+        deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
+      }
+      if (j1Needy && !j2Needy)
+        return -1;
+      else if (j2Needy && !j1Needy)
+        return 1;
+      else // Both needy or both non-needy; compare by deficit
+        return (int) Math.signum(deficitDif);
+    }
+  }
+  
+  /**
+   * Recompute the internal variables used by the scheduler - per-job weights,
+   * fair shares, deficits, minimum slot allocations, and numbers of running
+   * and needed tasks of each type. 
+   */
+  protected synchronized void update() {
+    // Remove non-running jobs
+    List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
+    for (JobInProgress job: infos.keySet()) { 
+      int runState = job.getStatus().getRunState();
+      if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED) {
+        toRemove.add(job);
+      }
+    }
+    for (JobInProgress job: toRemove) {
+      infos.remove(job);
+      poolMgr.removeJob(job);
+    }
+    // Update running jobs with deficits since last update, and compute new
+    // slot allocations, weight, shares and task counts
+    long now = clock.getTime();
+    long timeDelta = now - lastUpdateTime;
+    updateDeficits(timeDelta);
+    updateRunnability();
+    updateTaskCounts();
+    updateWeights();
+    updateMinSlots();
+    updateFairShares();
+    lastUpdateTime = now;
+  }
+  
+  private void updateDeficits(long timeDelta) {
+    for (JobInfo info: infos.values()) {
+      info.mapDeficit +=
+        (info.mapFairShare - info.runningMaps) * timeDelta;
+      info.reduceDeficit +=
+        (info.reduceFairShare - info.runningReduces) * timeDelta;
+    }
+  }
+  
+  private void updateRunnability() {
+    // Start by marking everything as not runnable
+    for (JobInfo info: infos.values()) {
+      info.runnable = false;
+    }
+    // Create a list of sorted jobs in order of start time and priority
+    List<JobInProgress> jobs = new ArrayList<JobInProgress>(infos.keySet());
+    Collections.sort(jobs, new FifoJobComparator());
+    // Mark jobs as runnable in order of start time and priority, until
+    // user or pool limits have been reached.
+    Map<String, Integer> userJobs = new HashMap<String, Integer>();
+    Map<String, Integer> poolJobs = new HashMap<String, Integer>();
+    for (JobInProgress job: jobs) {
+      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+        String user = job.getJobConf().getUser();
+        String pool = poolMgr.getPoolName(job);
+        int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
+        int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
+        if (userCount < poolMgr.getUserMaxJobs(user) && 
+            poolCount < poolMgr.getPoolMaxJobs(pool)) {
+          infos.get(job).runnable = true;
+          userJobs.put(user, userCount + 1);
+          poolJobs.put(pool, poolCount + 1);
+        }
+      }
+    }
+  }
+
+  private void updateTaskCounts() {
+    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+      JobInProgress job = entry.getKey();
+      JobInfo info = entry.getValue();
+      if (job.getStatus().getRunState() != JobStatus.RUNNING)
+        continue; // Job is still in PREP state and tasks aren't initialized
+      // Count maps
+      int totalMaps = job.numMapTasks;
+      int finishedMaps = 0;
+      int runningMaps = 0;
+      for (TaskInProgress tip: job.getMapTasks()) {
+        if (tip.isComplete()) {
+          finishedMaps += 1;
+        } else if (tip.isRunning()) {
+          runningMaps += tip.getActiveTasks().size();
+        }
+      }
+      info.runningMaps = runningMaps;
+      info.neededMaps = (totalMaps - runningMaps - finishedMaps
+          + taskSelector.neededSpeculativeMaps(job));
+      // Count reduces
+      int totalReduces = job.numReduceTasks;
+      int finishedReduces = 0;
+      int runningReduces = 0;
+      for (TaskInProgress tip: job.getReduceTasks()) {
+        if (tip.isComplete()) {
+          finishedReduces += 1;
+        } else if (tip.isRunning()) {
+          runningReduces += tip.getActiveTasks().size();
+        }
+      }
+      info.runningReduces = runningReduces;
+      info.neededReduces = (totalReduces - runningReduces - finishedReduces 
+                            + taskSelector.neededSpeculativeReduces(job));
+      // If the job was marked as not runnable due to its user or pool having
+      // too many active jobs, set the neededMaps/neededReduces to 0. We still
+      // count runningMaps/runningReduces however so we can give it a deficit.
+      if (!info.runnable) {
+        info.neededMaps = 0;
+        info.neededReduces = 0;
+      }
+    }
+  }
+
+  private void updateWeights() {
+    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+      JobInProgress job = entry.getKey();
+      JobInfo info = entry.getValue();
+      info.mapWeight = calculateWeight(job, TaskType.MAP);
+      info.reduceWeight = calculateWeight(job, TaskType.REDUCE);
+    }
+  }
+  
+  private void updateMinSlots() {
+    // Clear old minSlots
+    for (JobInfo info: infos.values()) {
+      info.minMaps = 0;
+      info.minReduces = 0;
+    }
+    // For each pool, distribute its task allocation among jobs in it that need
+    // slots. This is a little tricky since some jobs in the pool might not be
+    // able to use all the slots, e.g. they might have only a few tasks left.
+    // To deal with this, we repeatedly split up the available task slots
+    // between the jobs left, give each job min(its alloc, # of slots it needs),
+    // and redistribute any slots that are left over between jobs that still
+    // need slots on the next pass. If, in total, the jobs in our pool don't
+    // need all its allocation, we leave the leftover slots for general use.
+    PoolManager poolMgr = getPoolManager();
+    for (Pool pool: poolMgr.getPools()) {
+      for (final TaskType type: TaskType.values()) {
+        Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
+        int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
+        // Keep assigning slots until none are left
+        while (slotsLeft > 0) {
+          // Figure out total weight of jobs that still need slots
+          double totalWeight = 0;
+          for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
+            JobInProgress job = it.next();
+            if (isRunnable(job) &&
+                runnableTasks(job, type) > minTasks(job, type)) {
+              totalWeight += weight(job, type);
+            } else {
+              it.remove();
+            }
+          }
+          if (totalWeight == 0) // No jobs that can use more slots are left 
+            break;
+          // Assign slots to jobs, using the floor of their weight divided by
+          // total weight. This ensures that all jobs get some chance to take
+          // a slot. Then, if no slots were assigned this way, we do another
+          // pass where we use ceil, in case some slots were still left over.
+          int oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
+          for (JobInProgress job: jobs) {
+            double weight = weight(job, type);
+            int share = (int) Math.floor(oldSlots * weight / totalWeight);
+            slotsLeft = giveMinSlots(job, type, slotsLeft, share);
+          }
+          if (slotsLeft == oldSlots) {
+            // No tasks were assigned; do another pass using ceil, giving the
+            // extra slots to jobs in order of weight then deficit
+            List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);
+            Collections.sort(sortedJobs, new Comparator<JobInProgress>() {
+              public int compare(JobInProgress j1, JobInProgress j2) {
+                double dif = weight(j2, type) - weight(j1, type);
+                if (dif == 0) // Weights are equal, compare by deficit 
+                  dif = deficit(j2, type) - deficit(j1, type);
+                return (int) Math.signum(dif);
+              }
+            });
+            for (JobInProgress job: sortedJobs) {
+              double weight = weight(job, type);
+              int share = (int) Math.ceil(oldSlots * weight / totalWeight);
+              slotsLeft = giveMinSlots(job, type, slotsLeft, share);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Give up to <code>tasksToGive</code> min slots to a job (potentially fewer
+   * if either the job needs fewer slots or there aren't enough slots left).
+   * Returns the number of slots left over.
+   */
+  private int giveMinSlots(JobInProgress job, TaskType type,
+      int slotsLeft, int slotsToGive) {
+    int runnable = runnableTasks(job, type);
+    int curMin = minTasks(job, type);
+    slotsToGive = Math.min(Math.min(slotsLeft, runnable - curMin), slotsToGive);
+    slotsLeft -= slotsToGive;
+    JobInfo info = infos.get(job);
+    if (type == TaskType.MAP)
+      info.minMaps += slotsToGive;
+    else
+      info.minReduces += slotsToGive;
+    return slotsLeft;
+  }
+
+  private void updateFairShares() {
+    // Clear old fairShares
+    for (JobInfo info: infos.values()) {
+      info.mapFairShare = 0;
+      info.reduceFairShare = 0;
+    }
+    // Assign new shares, based on weight and minimum share. This is done
+    // as follows. First, we split up the available slots between all
+    // jobs according to weight. Then if there are any jobs whose minSlots is
+    // larger than their fair allocation, we give them their minSlots and
+    // remove them from the list, and start again with the amount of slots
+    // left over. This continues until all jobs' minSlots are less than their
+    // fair allocation, and at this point we know that we've met everyone's
+    // guarantee and we've split the excess capacity fairly among jobs left.
+    for (TaskType type: TaskType.values()) {
+      // Select only jobs that still need this type of task
+      HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
+      for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+        JobInProgress job = entry.getKey();
+        JobInfo info = entry.getValue();
+        if (isRunnable(job) && runnableTasks(job, type) > 0) {
+          jobsLeft.add(info);
+        }
+      }
+      double slotsLeft = getTotalSlots(type);
+      while (!jobsLeft.isEmpty()) {
+        double totalWeight = 0;
+        for (JobInfo info: jobsLeft) {
+          double weight = (type == TaskType.MAP ?
+              info.mapWeight : info.reduceWeight);
+          totalWeight += weight;
+        }
+        boolean recomputeSlots = false;
+        double oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
+        for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {
+          JobInfo info = iter.next();
+          double minSlots = (type == TaskType.MAP ?
+              info.minMaps : info.minReduces);
+          double weight = (type == TaskType.MAP ?
+              info.mapWeight : info.reduceWeight);
+          double fairShare = weight / totalWeight * oldSlots;
+          if (minSlots > fairShare) {
+            // Job needs more slots than its fair share; give it its minSlots,
+            // remove it from the list, and set recomputeSlots = true to 
+            // remember that we must loop again to redistribute unassigned slots
+            if (type == TaskType.MAP)
+              info.mapFairShare = minSlots;
+            else
+              info.reduceFairShare = minSlots;
+            slotsLeft -= minSlots;
+            iter.remove();
+            recomputeSlots = true;
+          }
+        }
+        if (!recomputeSlots) {
+          // All minimums are met. Give each job its fair share of excess slots.
+          for (JobInfo info: jobsLeft) {
+            double weight = (type == TaskType.MAP ?
+                info.mapWeight : info.reduceWeight);
+            double fairShare = weight / totalWeight * oldSlots;
+            if (type == TaskType.MAP)
+              info.mapFairShare = fairShare;
+            else
+              info.reduceFairShare = fairShare;
+          }
+          break;
+        }
+      }
+    }
+  }
+
+  private double calculateWeight(JobInProgress job, TaskType taskType) {
+    if (!isRunnable(job)) {
+      return 0;
+    } else {
+      double weight = 1.0;
+      if (sizeBasedWeight) {
+        // Set weight based on runnable tasks
+        weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);
+      }
+      weight *= getPriorityFactor(job.getPriority());
+      if (weightAdjuster != null) {
+        // Run weight through the user-supplied weightAdjuster
+        weight = weightAdjuster.adjustWeight(job, taskType, weight);
+      }
+      return weight;
+    }
+  }
+
+  private double getPriorityFactor(JobPriority priority) {
+    switch (priority) {
+    case VERY_HIGH: return 4.0;
+    case HIGH:      return 2.0;
+    case NORMAL:    return 1.0;
+    case LOW:       return 0.5;
+    default:        return 0.25; // priority = VERY_LOW
+    }
+  }
+  
+  public PoolManager getPoolManager() {
+    return poolMgr;
+  }
+
+  public int getTotalSlots(TaskType type) {
+    int slots = 0;
+    for (TaskTrackerStatus tt: taskTrackerManager.taskTrackers()) {
+      slots += (type == TaskType.MAP ?
+          tt.getMaxMapTasks() : tt.getMaxReduceTasks());
+    }
+    return slots;
+  }
+
+  public boolean getUseFifo() {
+    return useFifo;
+  }
+  
+  public void setUseFifo(boolean useFifo) {
+    this.useFifo = useFifo;
+  }
+  
+  // Getter methods for reading JobInfo values based on TaskType, safely
+  // returning 0's for jobs with no JobInfo present.
+
+  protected int neededTasks(JobInProgress job, TaskType taskType) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return taskType == TaskType.MAP ? info.neededMaps : info.neededReduces;
+  }
+  
+  protected int runningTasks(JobInProgress job, TaskType taskType) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return taskType == TaskType.MAP ? info.runningMaps : info.runningReduces;
+  }
+
+  protected int runnableTasks(JobInProgress job, TaskType type) {
+    return neededTasks(job, type) + runningTasks(job, type);
+  }
+
+  protected int minTasks(JobInProgress job, TaskType type) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return (type == TaskType.MAP) ? info.minMaps : info.minReduces;
+  }
+
+  protected double weight(JobInProgress job, TaskType taskType) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return (taskType == TaskType.MAP ? info.mapWeight : info.reduceWeight);
+  }
+
+  protected double deficit(JobInProgress job, TaskType taskType) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return taskType == TaskType.MAP ? info.mapDeficit : info.reduceDeficit;
+  }
+
+  protected boolean isRunnable(JobInProgress job) {
+    JobInfo info = infos.get(job);
+    if (info == null) return false;
+    return info.runnable;
+  }
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Servlet for displaying fair scheduler information, installed at
+ * [job tracker URL]/scheduler when the {@link FairScheduler} is in use.
+ * 
+ * The main features are viewing each job's task count and fair share, ability
+ * to change job priorities and pools from the UI, and ability to switch the
+ * scheduler to FIFO mode without restarting the JobTracker if this is required
+ * for any reason.
+ * 
+ * There is also an "advanced" view for debugging that can be turned on by
+ * going to [job tracker URL]/scheduler?advanced.
+ */
+public class FairSchedulerServlet extends HttpServlet {
+  private static final long serialVersionUID = 9104070533067306659L;
+  private static final DateFormat DATE_FORMAT = 
+    new SimpleDateFormat("MMM dd, HH:mm");
+  
+  private FairScheduler scheduler;
+  private JobTracker jobTracker;
+  private static long lastId = 0; // Used to generate unique element IDs
+
+  @Override
+  public void init() throws ServletException {
+    super.init();
+    ServletContext servletContext = this.getServletContext();
+    this.scheduler = (FairScheduler) servletContext.getAttribute("scheduler");
+    this.jobTracker = (JobTracker) scheduler.taskTrackerManager;
+  }
+  
+  @Override
+  protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    doGet(req, resp); // Same handler for both GET and POST
+  }
+  
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+    throws ServletException, IOException {
+    // If the request has a set* param, handle that and redirect to the regular
+    // view page so that the user won't resubmit the data if they hit refresh.
+    boolean advancedView = request.getParameter("advanced") != null;
+    if (request.getParameter("setFifo") != null) {
+      scheduler.setUseFifo(request.getParameter("setFifo").equals("true"));
+      response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+      return;
+    }
+    if (request.getParameter("setPool") != null) {
+      PoolManager poolMgr = scheduler.getPoolManager();
+      synchronized (poolMgr) {
+        String pool = request.getParameter("setPool");
+        String jobId = request.getParameter("jobid");
+        Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+        for (JobInProgress job: runningJobs) {
+          if (job.getProfile().getJobID().toString().equals(jobId)) {
+            poolMgr.setPool(job, pool);
+            scheduler.update();
+            break;
+          }
+        }
+      }
+      response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+      return;
+    }
+    if (request.getParameter("setPriority") != null) {
+      PoolManager poolMgr = scheduler.getPoolManager();
+      synchronized (poolMgr) {
+        JobPriority priority = JobPriority.valueOf(request.getParameter(
+            "setPriority"));
+        String jobId = request.getParameter("jobid");
+        Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+        for (JobInProgress job: runningJobs) {
+          if (job.getProfile().getJobID().toString().equals(jobId)) {
+            job.setPriority(priority);
+            scheduler.update();
+            break;
+          }
+        }
+      }
+      response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+      return;
+    }
+    // Print out the normal response
+    response.setContentType("text/html");
+    PrintWriter out = new PrintWriter(response.getOutputStream());
+    String hostname = StringUtils.simpleHostname(
+        jobTracker.getJobTrackerMachine());
+    out.print("<html><head>");
+    out.printf("<title>%s Job Scheduler Admininstration</title>\n", hostname);
+    out.printf("<META http-equiv=\"refresh\" content=\"15;URL=/scheduler%s\">",
+        advancedView ? "?advanced" : "");
+    out.print("<link rel=\"stylesheet\" type=\"text/css\" " + 
+        "href=\"/static/hadoop.css\">\n");
+    out.print("</head><body>\n");
+    out.printf("<h1><a href=\"/jobtracker.jsp\">%s</a> " + 
+        "Job Scheduler Administration</h1>\n", hostname);
+    showPools(out, advancedView);
+    showJobs(out, advancedView);
+    showAdminForm(out, advancedView);
+    out.print("</body></html>\n");
+    out.close();
+  }
+
+  /**
+   * Print a view of pools to the given output writer.
+   */
+  private void showPools(PrintWriter out, boolean advancedView) {
+    PoolManager poolManager = scheduler.getPoolManager();
+    synchronized(poolManager) {
+      out.print("<h2>Pools</h2>\n");
+      out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+      out.print("<tr><th>Pool</th><th>Running Jobs</th>" + 
+          "<th>Min Maps</th><th>Min Reduces</th>" + 
+          "<th>Running Maps</th><th>Running Reduces</th></tr>\n");
+      List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
+      Collections.sort(pools, new Comparator<Pool>() {
+        public int compare(Pool p1, Pool p2) {
+          if (p1.isDefaultPool())
+            return 1;
+          else if (p2.isDefaultPool())
+            return -1;
+          else return p1.getName().compareTo(p2.getName());
+        }});
+      for (Pool pool: pools) {
+        int runningMaps = 0;
+        int runningReduces = 0;
+        for (JobInProgress job: pool.getJobs()) {
+          JobInfo info = scheduler.infos.get(job);
+          if (info != null) {
+            runningMaps += info.runningMaps;
+            runningReduces += info.runningReduces;
+          }
+        }
+        out.print("<tr>\n");
+        out.printf("<td>%s</td>\n", pool.getName());
+        out.printf("<td>%s</td>\n", pool.getJobs().size());
+        out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+            TaskType.MAP));
+        out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(), 
+            TaskType.REDUCE));
+        out.printf("<td>%s</td>\n", runningMaps);
+        out.printf("<td>%s</td>\n", runningReduces);
+        out.print("</tr>\n");
+      }
+      out.print("</table>\n");
+    }
+  }
+
+  /**
+   * Print a view of running jobs to the given output writer.
+   */
+  private void showJobs(PrintWriter out, boolean advancedView) {
+    out.print("<h2>Running Jobs</h2>\n");
+    out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+    int colsPerTaskType = advancedView ? 5 : 3;
+    out.printf("<tr><th rowspan=2>Submitted</th>" + 
+        "<th rowspan=2>JobID</th>" +
+        "<th rowspan=2>User</th>" +
+        "<th rowspan=2>Name</th>" +
+        "<th rowspan=2>Pool</th>" +
+        "<th rowspan=2>Priority</th>" +
+        "<th colspan=%d>Maps</th>" +
+        "<th colspan=%d>Reduces</th>",
+        colsPerTaskType, colsPerTaskType);
+    out.print("</tr><tr>\n");
+    out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
+        (advancedView ? "<th>Weight</th><th>Deficit</th>" : ""));
+    out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
+        (advancedView ? "<th>Weight</th><th>Deficit</th>" : ""));
+    out.print("</tr>\n");
+    Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+    for (JobInProgress job: runningJobs) {
+      JobProfile profile = job.getProfile();
+      JobInfo info = scheduler.infos.get(job);
+      if (info == null) { // Job finished, but let's show 0's for info
+        info = new JobInfo();
+      }
+      out.print("<tr>\n");
+      out.printf("<td>%s</td>\n", DATE_FORMAT.format(
+          new Date(job.getStartTime())));
+      out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
+          profile.getJobID(), profile.getJobID());
+      out.printf("<td>%s</td>\n", profile.getUser());
+      out.printf("<td>%s</td>\n", profile.getJobName());
+      out.printf("<td>%s</td>\n", generateSelect(
+          scheduler.getPoolManager().getPoolNames(),
+          scheduler.getPoolManager().getPoolName(job),
+          "/scheduler?setPool=<CHOICE>&jobid=" + profile.getJobID() +
+          (advancedView ? "&advanced" : "")));
+      out.printf("<td>%s</td>\n", generateSelect(
+          Arrays.asList(new String[]
+              {"VERY_LOW", "LOW", "NORMAL", "HIGH", "VERY_HIGH"}),
+          job.getPriority().toString(),
+          "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID() +
+          (advancedView ? "&advanced" : "")));
+      out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
+          job.finishedMaps(), job.desiredMaps(), info.runningMaps,
+          info.mapFairShare);
+      if (advancedView) {
+        out.printf("<td>%8.1f</td>\n", info.mapWeight);
+        out.printf("<td>%s</td>\n", info.neededMaps > 0 ?
+            (info.mapDeficit / 1000) + "s" : "--");
+      }
+      out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
+          job.finishedReduces(), job.desiredReduces(), info.runningReduces,
+          info.reduceFairShare);
+      if (advancedView) {
+        out.printf("<td>%8.1f</td>\n", info.reduceWeight);
+        out.printf("<td>%s</td>\n", info.neededReduces > 0 ?
+            (info.reduceDeficit / 1000) + "s" : "--");
+      }
+      out.print("</tr>\n");
+    }
+    out.print("</table>\n");
+  }
+
+  /**
+   * Generate a HTML select control with a given list of choices and a given
+   * option selected. When the selection is changed, take the user to the
+   * <code>submitUrl</code>. The <code>submitUrl</code> can be made to include
+   * the option selected -- the first occurrence of the substring
+   * <code>&lt;CHOICE&gt;</code> will be replaced by the option chosen.
+   */
+  private String generateSelect(Iterable<String> choices, 
+      String selectedChoice, String submitUrl) {
+    StringBuilder html = new StringBuilder();
+    String id = "select" + lastId++;
+    html.append("<select id=\"" + id + "\" name=\"" + id + "\" " + 
+        "onchange=\"window.location = '" + submitUrl + 
+        "'.replace('<CHOICE>', document.getElementById('" + id +
+        "').value);\">\n");
+    for (String choice: choices) {
+      html.append(String.format("<option value=\"%s\"%s>%s</option>\n",
+          choice, (choice.equals(selectedChoice) ? " selected" : ""), choice));
+    }
+    html.append("</select>\n");
+    return html.toString();
+  }
+
+  /**
+   * Print the administration form at the bottom of the page, which currently
+   * only includes the button for switching between FIFO and Fair Scheduling.
+   */
+  private void showAdminForm(PrintWriter out, boolean advancedView) {
+    out.print("<h2>Scheduling Mode</h2>\n");
+    String curMode = scheduler.getUseFifo() ? "FIFO" : "Fair Sharing";
+    String otherMode = scheduler.getUseFifo() ? "Fair Sharing" : "FIFO";
+    String advParam = advancedView ? "?advanced" : "";
+    out.printf("<form method=\"post\" action=\"/scheduler%s\">\n", advParam);
+    out.printf("<p>The scheduler is currently using <b>%s mode</b>. " +
+        "<input type=\"submit\" value=\"Switch to %s mode.\" " + 
+        "onclick=\"return confirm('Are you sure you want to change " +
+        "scheduling mode to %s?')\" />\n",
+        curMode, otherMode, otherMode);
+    out.printf("<input type=\"hidden\" name=\"setFifo\" value=\"%s\" />",
+        !scheduler.getUseFifo());
+    out.print("</form>\n");
+  }
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Comparator;
+
+/**
+ * Order {@link JobInProgress} objects by priority and then by submit time, as
+ * in the default scheduler in Hadoop.
+ */
+public class FifoJobComparator implements Comparator<JobInProgress> {
+  public int compare(JobInProgress j1, JobInProgress j2) {
+    int res = j1.getPriority().compareTo(j2.getPriority());
+    if (res == 0) {
+      if (j1.getStartTime() < j2.getStartTime()) {
+        res = -1;
+      } else {
+        res = (j1.getStartTime() == j2.getStartTime() ? 0 : 1);
+      }
+    }
+    if (res == 0) {
+      res = j1.hashCode() - j2.hashCode();
+    }
+    return res;
+  }
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A pluggable object that manages the load on each {@link TaskTracker}, telling
+ * the {@link TaskScheduler} when it can launch new tasks. 
+ */
+public abstract class LoadManager implements Configurable {
+  protected Configuration conf;
+  protected TaskTrackerManager taskTrackerManager;
+  
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public synchronized void setTaskTrackerManager(
+      TaskTrackerManager taskTrackerManager) {
+    this.taskTrackerManager = taskTrackerManager;
+  }
+  
+  /**
+   * Lifecycle method to allow the LoadManager to start any work in separate
+   * threads.
+   */
+  public void start() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * Lifecycle method to allow the LoadManager to stop any work it is doing.
+   */
+  public void terminate() throws IOException {
+    // do nothing
+  }
+  
+  /**
+   * Can a given {@link TaskTracker} run another map task?
+   * @param tracker The machine we wish to run a new map on
+   * @param totalRunnableMaps Set of running jobs in the cluster
+   * @return true if another map can be launched on <code>tracker</code>
+   */
+  public abstract boolean canAssignMap(TaskTrackerStatus tracker,
+      int totalRunnableMaps);
+
+  /**
+   * Can a given {@link TaskTracker} run another reduce task?
+   * @param tracker The machine we wish to run a new map on
+   * @param totalReducesNeeded Set of running jobs in the cluster
+   * @return true if another reduce can be launched on <code>tracker</code>
+   */
+  public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
+      int totalRunnableReduces);
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * A {@link WeightAdjuster} implementation that gives a weight boost to new jobs
+ * for a certain amount of time -- by default, a 3x weight boost for 60 seconds.
+ * This can be used to make shorter jobs finish faster, emulating Shortest Job
+ * First scheduling while not starving long jobs. 
+ */
+public class NewJobWeightBooster extends Configured implements WeightAdjuster {
+  private static final float DEFAULT_FACTOR = 3;
+  private static final long DEFAULT_DURATION = 5 * 60 * 1000;
+
+  private float factor;
+  private long duration;
+
+  public void setConf(Configuration conf) {
+    if (conf != null) {
+      factor = conf.getFloat("mapred.newjobweightbooster.factor",
+          DEFAULT_FACTOR);
+      duration = conf.getLong("mapred.newjobweightbooster.duration",
+          DEFAULT_DURATION);
+    }
+    super.setConf(conf);
+  }
+  
+  public double adjustWeight(JobInProgress job, TaskType taskType,
+      double curWeight) {
+    long start = job.getStartTime();
+    long now = System.currentTimeMillis();
+    if (now - start < duration) {
+      return curWeight * factor;
+    } else {
+      return curWeight;
+    }
+  }
+}

Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * A schedulable pool of jobs.
+ */
+public class Pool {
+  /** Name of the default pool, where jobs with no pool parameter go. */
+  public static final String DEFAULT_POOL_NAME = "default";
+  
+  /** Pool name. */
+  private String name;
+  
+  /** Jobs in this specific pool; does not include children pools' jobs. */
+  private Collection<JobInProgress> jobs = new ArrayList<JobInProgress>();
+
+  public Pool(String name) {
+    this.name = name;
+  }
+  
+  public Collection<JobInProgress> getJobs() {
+    return jobs;
+  }
+  
+  public void addJob(JobInProgress job) {
+    jobs.add(job);
+  }
+  
+  public void removeJob(JobInProgress job) {
+    jobs.remove(job);
+  }
+  
+  public String getName() {
+    return name;
+  }
+
+  public boolean isDefaultPool() {
+    return Pool.DEFAULT_POOL_NAME.equals(name);
+  }
+}