You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/09/11 20:49:26 UTC

svn commit: r694415 [1/2] - in /hadoop/core/trunk: conf/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/ src/contrib/capacity-scheduler/src/java/ src/contrib/capacity-scheduler/src/java/org/ src/contrib/capacity-schedul...

Author: omalley
Date: Thu Sep 11 11:49:22 2008
New Revision: 694415

URL: http://svn.apache.org/viewvc?rev=694415&view=rev
Log:
HADOOP-3445. Add capacity scheduler that provides guaranteed capacities to 
queues as a percentage of the cluster. (Vivek Ratan via omalley)

Added:
    hadoop/core/trunk/conf/capacity-scheduler.xml.template
    hadoop/core/trunk/src/contrib/capacity-scheduler/
    hadoop/core/trunk/src/contrib/capacity-scheduler/README
    hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
Removed:
    hadoop/core/trunk/conf/resource-manager-conf.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java
Modified:
    hadoop/core/trunk/conf/   (props changed)
    hadoop/core/trunk/src/contrib/build.xml
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

Propchange: hadoop/core/trunk/conf/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Sep 11 11:49:22 2008
@@ -3,3 +3,4 @@
 slaves
 masters
 hadoop-env.sh
+capacity-scheduler.xml

Added: hadoop/core/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/conf/capacity-scheduler.xml.template (added)
+++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Thu Sep 11 11:49:22 2008
@@ -0,0 +1,50 @@
+<?xml version="1.0"?>
+
+<!-- This is the configuration file for the resource manager in Hadoop. -->
+<!-- You can configure various scheduling parameters related to queues. -->
+<!-- The properties for a queue follow a naming convention,such as, -->
+<!-- mapred.capacity-scheduler.queue.<queue-name>.property-name. -->
+
+<configuration>
+
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.guaranteed-capacity</name>
+    <value>100</value>
+    <description>Percentage of the number of slots in the cluster that are
+      guaranteed to be available for jobs in this queue.
+    </description>    
+  </property>
+  
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.reclaim-time-limit</name>
+    <value>300</value>
+    <description>The amount of time, in seconds, before which 
+      resources distributed to other queues will be reclaimed.
+    </description>
+  </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.supports-priority</name>
+    <value>false</value>
+    <description>If true, priorities of jobs will be taken into 
+      account in scheduling decisions.
+    </description>
+  </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.minimum-user-limit-percent</name>
+    <value>100</value>
+    <description> Each queue enforces a limit on the percentage of resources 
+    allocated to a user at any given time, if there is competition for them. 
+    This user limit can vary between a minimum and maximum value. The former
+    depends on the number of users who have submitted jobs, and the latter is
+    set to this property value. For example, suppose the value of this 
+    property is 25. If two users have submitted jobs to a queue, no single 
+    user can use more than 50% of the queue resources. If a third user submits
+    a job, no single user can use more than 33% of the queue resources. With 4 
+    or more users, no user can use more than 25% of the queue's resources. A 
+    value of 100 implies no user limits are imposed. 
+    </description>
+  </property>
+  
+</configuration>

Modified: hadoop/core/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build.xml?rev=694415&r1=694414&r2=694415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/build.xml (original)
+++ hadoop/core/trunk/src/contrib/build.xml Thu Sep 11 11:49:22 2008
@@ -48,6 +48,7 @@
     <subant target="test">
       <fileset dir="." includes="streaming/build.xml"/>
       <fileset dir="." includes="fairscheduler/build.xml"/>
+      <fileset dir="." includes="capacity-scheduler/build.xml"/>
     </subant>
   </target>
   

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/README
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/README?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/README (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/README Thu Sep 11 11:49:22 2008
@@ -0,0 +1,135 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License.  You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.  See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements a scheduler for Map-Reduce jobs, called Capacity
+Task Scheduler (or just Capacity Scheduler), which provides a way to share 
+large clusters. The scheduler provides the following features (which are 
+described in detail in HADOOP-3421):
+
+* Support for queues, where a job is submitted to a queue. 
+* Queues are guaranteed a fraction of the capacity of the grid (their 
+ 'guaranteed capacity') in the sense that a certain capacity of resources 
+ will be at their disposal. All jobs submitted to the queues of an Org will 
+ have access to the capacity guaranteed to the Org.
+* Free resources can be allocated to any queue beyond its guaranteed capacity.
+ These excess allocated resources can be reclaimed and made available to 
+ another queue in order to meet its capacity guarantee.
+* The scheduler guarantees that excess resources taken from a queue will be 
+ restored to it within N minutes of its need for them.
+* Queues optionally support job priorities (disabled by default). 
+* Within a queue, jobs with higher priority will have access to the queue's 
+ resources before jobs with lower priority. However, once a job is running, it
+ will not be preempted for a higher priority job.
+* In order to prevent one or more users from monopolizing its resources, each 
+ queue enforces a limit on the percentage of resources allocated to a user at
+ any given time, if there is competition for them.
+* Support for memory-intensive jobs, wherein a job can optionally specify 
+ higher memory-requirements than the default, and the tasks of the job will 
+ only be run on TaskTrackers that have enough memory to spare. 
+
+Whenever a TaskTracker is free, the Capacity Scheduler first picks a queue 
+that needs to reclaim any resources the earliest. If no such queue is found, 
+it then picks a queue which has most free space (whose ratio of # of running
+slots to guaranteed capacity is the lowest). 
+
+--------------------------------------------------------------------------------
+
+BUILDING:
+
+In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
+
+--------------------------------------------------------------------------------
+
+INSTALLING:
+
+To run the capacity scheduler in your Hadoop installation, you need to put it 
+on the CLASSPATH. The easiest way is to copy the 
+hadoop-*-capacity-scheduler.jar from 
+HADOOP_HOME/build/contrib/capacity-scheduler to HADOOP_HOME/lib. Alternatively
+you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
+
+You will also need to set the following property in the Hadoop config file
+(conf/hadoop-site.xml) to have Hadoop use the capacity scheduler:
+
+<property>
+  <name>mapred.jobtracker.taskScheduler</name>
+  <value>org.apache.hadoop.mapred.CapacityTaskScheduler</value>
+</property>
+
+--------------------------------------------------------------------------------
+
+CONFIGURATION:
+
+The following properties can be set in hadoop-site.xml to configure the
+scheduler:
+
+mapred.capacity-scheduler.reclaimCapacity.interval: 
+		The capacity scheduler checks, every 'interval' seconds, whether any 
+		capacity needs to be reclaimed. The default value is 5 seconds. 
+
+The scheduling information for queues is maintained in a configuration file
+called 'capacity-scheduler.xml'. Note that the queue names are set in 
+hadoop-site.xml. capacity-scheduler.xml sets the scheduling properties
+for each queue. See that file for configuration details, but the following 
+are the configuration options for each queue: 
+
+mapred.capacity-scheduler.queue.<queue-name>.guaranteed-capacity
+		Percentage of the number of slots in the cluster that are
+    guaranteed to be available for jobs in this queue.
+    The sum of guaranteed capacities for all queues should be less than or
+    equal 100. 
+
+mapred.capacity-scheduler.queue.<queue-name>.reclaim-time-limit
+		The amount of time, in seconds, before which resources distributed to other 
+		queues will be reclaimed.
+
+mapred.capacity-scheduler.queue.<queue-name>.supports-priority
+		If true, priorities of jobs will be taken into account in scheduling 
+		decisions.
+		
+mapred.capacity-scheduler.queue.<queue-name>.minimum-user-limit-percent
+		Each queue enforces a limit on the percentage of resources 
+    allocated to a user at any given time, if there is competition for them. 
+    This user limit can vary between a minimum and maximum value. The former
+    depends on the number of users who have submitted jobs, and the latter is
+    set to this property value. For example, suppose the value of this 
+    property is 25. If two users have submitted jobs to a queue, no single 
+    user can use more than 50% of the queue resources. If a third user submits
+    a job, no single user can use more than 33% of the queue resources. With 4 
+    or more users, no user can use more than 25% of the queue's resources. A 
+    value of 100 implies no user limits are imposed.
+		
+
+--------------------------------------------------------------------------------
+
+IMPLEMENTATION:
+
+When a TaskTracker is free, the capacity scheduler does the following (note
+that many of these steps can be, and will be, enhanced over time to provide
+better algorithms): 
+1. Decide whether to giev it a Map or Reduce task, depending on how many tasks
+the TT is already running of that type, with respect to the maximum taks it 
+can run.
+2. The scheduler then picks a queue. Queues that need to reclaim capacity 
+sooner, come before queues that don't. For queues that don't, they're ordered 
+by a ratio of (# of running tasks)/Guaranteed capacity, which indicates how 
+much 'free space' the queue has, or how much it is over capacity.
+3. A job is picked in the queue based on its state (running jobs are picked
+first), its priority (if the queue supports priorities) or its submission 
+time, and whether the job's user is under or over limit. 
+4. A task is picked from the job in the same way it always has. 
+
+Periodically, a thread checks each queue to see if it needs to reclaim any 
+capacity. Queues that are running below capacity and that have tasks waiting,
+need to reclaim capacity within a certain perdiod of time. If a queue hasn't 
+received enough tasks in a certain amount of time, tasks will be killed from 
+queues that are running over capacity.
+
+--------------------------------------------------------------------------------

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml Thu Sep 11 11:49:22 2008
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="capacity-scheduler" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+</project>

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Thu Sep 11 11:49:22 2008
@@ -0,0 +1,222 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Class providing access to resource manager configuration.
+ * 
+ * Resource manager configuration involves setting up queues, and defining
+ * various properties for the queues. These are typically read from a file 
+ * called resource-manager-conf.xml that must be in the classpath of the
+ * application. The class provides APIs to get/set and reload the 
+ * configuration for the queues.
+ */
+class CapacitySchedulerConf {
+  
+  /** Default file name from which the resource manager configuration is read. */ 
+  public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
+
+  /** Default value for guaranteed capacity of maps (as percentage).
+   * The default value is set to 100, to represent the entire queue. 
+   */ 
+  public static final float DEFAULT_GUARANTEED_CAPACITY = 100;
+
+  /** Default value for reclaiming redistributed resources.
+   * The default value is set to <code>300</code>. 
+   */ 
+  public static final int DEFAULT_RECLAIM_TIME_LIMIT = 300;
+  
+  /** Default value for minimum resource limit per user per queue, as a 
+   * percentage.
+   * The default value is set to <code>100</code>, the idea
+   * being that the default is suitable for organizations that do not
+   * require setting up any queues.
+   */ 
+  public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
+
+  private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
+    "mapred.capacity-scheduler.queue.";
+
+  private Configuration rmConf;
+  
+  /**
+   * Create a new ResourceManagerConf.
+   * This method reads from the default configuration file mentioned in
+   * {@link RM_CONF_FILE}, that must be present in the classpath of the
+   * application.
+   */
+  public CapacitySchedulerConf() {
+    rmConf = new Configuration(false);
+    rmConf.addResource(SCHEDULER_CONF_FILE);
+  }
+
+  /**
+   * Create a new ResourceManagerConf reading the specified configuration
+   * file.
+   * 
+   * @param configFile {@link Path} to the configuration file containing
+   * the resource manager configuration.
+   */
+  public CapacitySchedulerConf(Path configFile) {
+    rmConf = new Configuration(false);
+    rmConf.addResource(configFile);
+  }
+  
+  /**
+   * Get the guaranteed percentage of the cluster for the specified queue.
+   * 
+   * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return guaranteed percent of the cluster for the queue.
+   */
+  public float getGuaranteedCapacity(String queue) {
+    checkQueue(queue);
+    float result = rmConf.getFloat(toFullPropertyName(queue, 
+                                   "guaranteed-capacity"), 
+                                   DEFAULT_GUARANTEED_CAPACITY);
+    if (result < 0.0 || result > 100.0) {
+      throw new IllegalArgumentException("Illegal capacity for queue " + queue +
+                                         " of " + result);
+    }
+    return result;
+  }
+  
+  /**
+   * Get the amount of time before which redistributed resources must be
+   * reclaimed for the specified queue.
+   * 
+   * The resource manager distributes spare capacity from a free queue
+   * to ones which are in need for more resources. However, if a job 
+   * submitted to the first queue requires back the resources, they must
+   * be reclaimed within the specified configuration time limit.
+   * 
+   * This method defaults to {@link #DEFAULT_RECLAIM_TIME_LIMIT} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return reclaim time limit for this queue.
+   */
+  public int getReclaimTimeLimit(String queue) {
+    checkQueue(queue);
+    return rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"), 
+                          DEFAULT_RECLAIM_TIME_LIMIT);
+  }
+  
+  /**
+   * Set the amount of time before which redistributed resources must be
+   * reclaimed for the specified queue.
+   * @param queue Name of the queue
+   * @param value Amount of time before which the redistributed resources
+   * must be retained.
+   */
+  public void setReclaimTimeLimit(String queue, int value) {
+    checkQueue(queue);
+    rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
+  }
+  
+  /**
+   * Get whether priority is supported for this queue.
+   * 
+   * If this value is false, then job priorities will be ignored in 
+   * scheduling decisions. This method defaults to <code>false</code> if 
+   * the property is not configured for this queue. If the queue name is 
+   * unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return Whether this queue supports priority or not.
+   */
+  public boolean isPrioritySupported(String queue) {
+    checkQueue(queue);
+    return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
+                              false);  
+  }
+  
+  /**
+   * Set whether priority is supported for this queue.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param value true, if the queue must support priorities, false otherwise.
+   */
+  public void setPrioritySupported(String queue, boolean value) {
+    checkQueue(queue);
+    rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+  }
+  
+  /**
+   * Get the minimum limit of resources for any user submitting jobs in 
+   * this queue, in percentage.
+   * 
+   * This method defaults to {@link #DEFAULT_MIN_USER_LIMIT_PERCENT} if
+   * no value is specified in the configuration for this queue. If the queue
+   * name is unknown, this method throws a {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @return minimum limit of resources, in percentage, that will be 
+   * available for a user.
+   * 
+   */
+  public int getMinimumUserLimitPercent(String queue) {
+    checkQueue(queue);
+    return rmConf.getInt(toFullPropertyName(queue, 
+                          "minimum-user-limit-percent"), 
+                          DEFAULT_MIN_USER_LIMIT_PERCENT);
+  }
+  
+  /**
+   * Set the minimum limit of resources for any user submitting jobs in
+   * this queue, in percentage.
+   * 
+   * If the queue name is unknown, this method throws a 
+   * {@link IllegalArgumentException}
+   * @param queue name of the queue
+   * @param value minimum limit of resources for any user submitting jobs
+   * in this queue
+   */
+  public void setMinimumUserLimitPercent(String queue, int value) {
+    checkQueue(queue);
+    rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"), 
+                    value);
+  }
+  
+  /**
+   * Reload configuration by clearing the information read from the 
+   * underlying configuration file.
+   */
+  public synchronized void reloadConfiguration() {
+    rmConf.reloadConfiguration();
+  }
+  
+  private synchronized void checkQueue(String queue) {
+    /*if (queues == null) {
+      queues = getQueues();
+    }
+    if (!queues.contains(queue)) {
+      throw new IllegalArgumentException("Queue " + queue + " is undefined.");
+    }*/
+  }
+
+  private static final String toFullPropertyName(String queue, 
+                                                  String property) {
+      return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
+  }
+}

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Sep 11 11:49:22 2008
@@ -0,0 +1,1068 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
+ * and provides a HOD-less way to share large clusters. This scheduler 
+ * provides the following features: 
+ *  * support for queues, where a job is submitted to a queue. 
+ *  * Queues are guaranteed a fraction of the capacity of the grid (their 
+ *  'guaranteed capacity') in the sense that a certain capacity of resources 
+ *  will be at their disposal. All jobs submitted to the queues of an Org 
+ *  will have access to the capacity guaranteed to the Org.
+ *  * Free resources can be allocated to any queue beyond its guaranteed 
+ *  capacity. These excess allocated resources can be reclaimed and made 
+ *  available to another queue in order to meet its capacity guarantee.
+ *  * The scheduler guarantees that excess resources taken from a queue will 
+ *  be restored to it within N minutes of its need for them.
+ *  * Queues optionally support job priorities (disabled by default). 
+ *  * Within a queue, jobs with higher priority will have access to the 
+ *  queue's resources before jobs with lower priority. However, once a job 
+ *  is running, it will not be preempted for a higher priority job.
+ *  * In order to prevent one or more users from monopolizing its resources, 
+ *  each queue enforces a limit on the percentage of resources allocated to a 
+ *  user at any given time, if there is competition for them.
+ *  
+ */
+class CapacityTaskScheduler extends TaskScheduler {
+  
+  /** 
+   * For keeping track of reclaimed capacity. 
+   * Whenever slots need to be reclaimed, we create one of these objects. 
+   * As the queue gets slots, the amount to reclaim gets decremented. if 
+   * we haven't reclaimed enough within a certain time, we need to kill 
+   * tasks. This object 'expires' either if all resources are reclaimed
+   * before the deadline, or the deadline passes . 
+   */
+  private static class ReclaimedResource {
+    // how much resource to reclaim
+    public int originalAmount;
+    // how much is to be reclaimed currently
+    public int currentAmount;
+    // the time, in millisecs, when this object expires. 
+    // This time is equal to the time when the object was created, plus
+    // the reclaim-time SLA for the queue.  
+    public long whenToExpire;
+    // we also keep track of when to kill tasks, im millisecs. This is a 
+    // fraction of 'whenToExpire', but we store it here so we don't 
+    // recompute it every time. 
+    public long whenToKill;
+    // whether tasks have been killed for this resource
+    boolean tasksKilled;
+    
+    public ReclaimedResource(int amount, long expiryTime, 
+        long whenToKill) {
+      this.originalAmount = amount;
+      this.currentAmount = amount;
+      this.whenToExpire = expiryTime;
+      this.whenToKill = whenToKill;
+      this.tasksKilled = false;
+    }
+  }
+
+  /** 
+   * This class keeps track of scheduling info for each queue for either 
+   * Map or Reduce tasks. . 
+   * This scheduling information is used by the JT to decide how to allocate
+   * tasks, redistribute capacity, etc. 
+   */
+  private static class QueueSchedulingInfo {
+    String queueName;
+
+    /** guaranteed capacity(%) is set at config time */ 
+    float guaranteedCapacityPercent = 0;
+    /** 
+     * the actual gc, which depends on how many slots are available
+     * in the cluster at any given time. 
+     */
+    int guaranteedCapacity = 0;
+    
+    /** 
+     * we also keep track of how many tasks are running for all jobs in 
+     * the queue, and how many overall tasks there are. This info is 
+     * available for each job, but keeping a sum makes our algos faster.
+     */  
+    // number of running tasks
+    int numRunningTasks = 0;
+    // number of pending tasks
+    int numPendingTasks = 0;
+    
+    /** 
+     * to handle user limits, we need to know how many users have jobs in 
+     * the queue.
+     */  
+    Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
+    /** for each user, we need to keep track of number of running tasks */
+    Map<String, Integer> numRunningTasksByUser = 
+      new HashMap<String, Integer>();
+      
+    /** min value of user limit (same for all users) */
+    int ulMin;
+    
+    /**
+     * We need to keep track of resources to reclaim. 
+     * Whenever a queue is under capacity and has tasks pending, we offer it 
+     * an SLA that gives it free slots equal to or greater than the gap in 
+     * its capacity, within a period of time (reclaimTime). 
+     * To do this, we periodically check if queues need to reclaim capacity. 
+     * If they do, we create a ResourceReclaim object. We also periodically
+     * check if a queue has received enough free slots within, say, 80% of 
+     * its reclaimTime. If not, we kill enough tasks to make up the 
+     * difference. 
+     * We keep two queues of ResourceReclaim objects. when an object is 
+     * created, it is placed in one queue. Once we kill tasks to recover 
+     * resources for that object, it is placed in an expiry queue. we need
+     * to do this to prevent creating spurious ResourceReclaim objects. We 
+     * keep a count of total resources that are being reclaimed. Thsi count 
+     * is decremented when an object expires. 
+     */
+    
+    /**
+     * reclaim time limit (in msec). This time represents the SLA we offer 
+     * a queue - a queue gets back any lost capacity withing this period 
+     * of time.  
+     */ 
+    long reclaimTime;  
+    /**
+     * the list of resources to reclaim. This list is always sorted so that
+     * resources that need to be reclaimed sooner occur earlier in the list.
+     */
+    LinkedList<ReclaimedResource> reclaimList = 
+      new LinkedList<ReclaimedResource>();
+    /**
+     * the list of resources to expire. This list is always sorted so that
+     * resources that need to be expired sooner occur earlier in the list.
+     */
+    LinkedList<ReclaimedResource> reclaimExpireList = 
+      new LinkedList<ReclaimedResource>();
+    /** 
+     * sum of all resources that are being reclaimed. 
+     * We keep this to prevent unnecessary ReclaimResource objects from being
+     * created.  
+     */
+    int numReclaimedResources = 0;
+    
+    public QueueSchedulingInfo(String queueName, float guaranteedCapacity, 
+        int ulMin, long reclaimTime) {
+      this.queueName = new String(queueName);
+      this.guaranteedCapacityPercent = guaranteedCapacity;
+      this.ulMin = ulMin;
+      this.reclaimTime = reclaimTime;
+    }
+  }
+
+  /** 
+   * This class handles the scheduling algorithms. 
+   * The algos are the same for both Map and Reduce tasks. 
+   * There may be slight variations later, in which case we can make this
+   * an abstract base class and have derived classes for Map and Reduce.  
+   */
+  private static abstract class TaskSchedulingMgr {
+
+    /** quick way to get qsi object given a queue name */
+    private Map<String, QueueSchedulingInfo> queueInfoMap = 
+      new HashMap<String, QueueSchedulingInfo>();
+    /** we keep track of the number of map or reduce slots we saw last */
+    private int numSlots = 0;
+    /** our enclosing TaskScheduler object */
+    protected CapacityTaskScheduler scheduler;
+    // for debugging
+    protected String type = null;
+
+    abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
+        JobInProgress job) throws IOException; 
+    abstract int getClusterCapacity();
+    abstract int getRunningTasks(JobInProgress job);
+    abstract int getPendingTasks(JobInProgress job);
+    abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
+
+    /**
+     * List of QSIs for assigning tasks.
+     * This list is ordered such that queues that need to reclaim capacity
+     * sooner, come before queues that don't. For queues that don't, they're
+     * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which
+     * indicates how much 'free space' the queue has, or how much it is over
+     * capacity. This ordered list is iterated over, when assigning tasks.
+     */  
+    private List<QueueSchedulingInfo> qsiForAssigningTasks = 
+      new ArrayList<QueueSchedulingInfo>();  
+    /** comparator to sort queues */ 
+    private static final class QueueComparator 
+      implements Comparator<QueueSchedulingInfo> {
+      public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
+        // if one queue needs to reclaim something and the other one doesn't, 
+        // the former is first
+        if ((0 == q1.reclaimList.size()) && (0 != q2.reclaimList.size())) {
+          return 1;
+        }
+        else if ((0 != q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
+          return -1;
+        }
+        else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
+          // neither needs to reclaim. look at how much capacity they've filled
+          double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
+          double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
+          if (r1<r2) return -1;
+          else if (r1>r2) return 1;
+          else return 0;
+        }
+        else {
+          // both have to reclaim. Look at which one needs to reclaim earlier
+          long t1 = q1.reclaimList.get(0).whenToKill;
+          long t2 = q2.reclaimList.get(0).whenToKill;
+          if (t1<t2) return -1;
+          else if (t1>t2) return 1;
+          else return 0;
+        }
+      }
+    }
+    private final static QueueComparator queueComparator = new QueueComparator();
+
+   
+    TaskSchedulingMgr(CapacityTaskScheduler sched) {
+      scheduler = sched;
+    }
+    
+    private void add(QueueSchedulingInfo qsi) {
+      queueInfoMap.put(qsi.queueName, qsi);
+      qsiForAssigningTasks.add(qsi);
+    }
+    private int getNumQueues() {
+      return queueInfoMap.size();
+    }
+    private boolean isQueuePresent(String queueName) {
+      return queueInfoMap.containsKey(queueName);
+    }
+
+    /** 
+     * Periodically, we walk through our queues to do the following: 
+     * a. Check if a queue needs to reclaim any resources within a period
+     * of time (because it's running below capacity and more tasks are
+     * waiting)
+     * b. Check if a queue hasn't received enough of the resources it needed
+     * to be reclaimed and thus tasks need to be killed.
+     */
+    private synchronized void reclaimCapacity() {
+      int tasksToKill = 0;
+      // with only one queue, there's nothing to do
+      if (queueInfoMap.size() < 2) {
+        return;
+      }
+      QueueSchedulingInfo lastQsi = 
+        qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
+      long currentTime = scheduler.clock.getTime();
+      for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+        // is there any resource that needs to be reclaimed? 
+        if ((!qsi.reclaimList.isEmpty()) &&  
+            (qsi.reclaimList.getFirst().whenToKill < 
+              currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) {
+          // make a note of how many tasks to kill to claim resources
+          tasksToKill += qsi.reclaimList.getFirst().currentAmount;
+          // move this to expiry list
+          ReclaimedResource r = qsi.reclaimList.remove();
+          qsi.reclaimExpireList.add(r);
+        }
+        // is there any resource that needs to be expired?
+        if ((!qsi.reclaimExpireList.isEmpty()) && 
+            (qsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
+          ReclaimedResource r = qsi.reclaimExpireList.remove();
+          qsi.numReclaimedResources -= r.originalAmount;
+        }
+        // do we need to reclaim a resource later? 
+        // if no queue is over capacity, there's nothing to reclaim
+        if (lastQsi.numRunningTasks <= lastQsi.guaranteedCapacity) {
+          continue;
+        }
+        if (qsi.numRunningTasks < qsi.guaranteedCapacity) {
+          // usedCap is how much capacity is currently accounted for
+          int usedCap = qsi.numRunningTasks + qsi.numReclaimedResources;
+          // see if we have remaining capacity and if we have enough pending 
+          // tasks to use up remaining capacity
+          if ((usedCap < qsi.guaranteedCapacity) && 
+              ((qsi.numPendingTasks - qsi.numReclaimedResources)>0)) {
+            // create a request for resources to be reclaimed
+            int amt = Math.min((qsi.guaranteedCapacity-usedCap), 
+                (qsi.numPendingTasks - qsi.numReclaimedResources));
+            // create a rsource object that needs to be reclaimed some time
+            // in the future
+            long whenToKill = qsi.reclaimTime - 
+              (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * 
+                  scheduler.taskTrackerManager.getNextHeartbeatInterval());
+            if (whenToKill < 0) whenToKill = 0;
+            qsi.reclaimList.add(new ReclaimedResource(amt, 
+                currentTime + qsi.reclaimTime, 
+                currentTime + whenToKill));
+            qsi.numReclaimedResources += amt;
+            LOG.debug("Queue " + qsi.queueName + " needs to reclaim " + 
+                amt + " resources");
+          }
+        }
+      }
+      // kill tasks to reclaim capacity
+      if (0 != tasksToKill) {
+        killTasks(tasksToKill);
+      }
+    }
+
+    // kill 'tasksToKill' tasks 
+    private void killTasks(int tasksToKill)
+    {
+      /* 
+       * There are a number of fair ways in which one can figure out how
+       * many tasks to kill from which queue, so that the total number of
+       * tasks killed is equal to 'tasksToKill'.
+       * Maybe the best way is to keep a global ordering of running tasks
+       * and kill the ones that ran last, irrespective of what queue or 
+       * job they belong to. 
+       * What we do here is look at how many tasks is each queue running
+       * over capacity, and use that as a weight to decide how many tasks
+       * to kill from that queue.
+       */ 
+      
+      // first, find out all queues over capacity
+      int loc;
+      for (loc=0; loc<qsiForAssigningTasks.size(); loc++) {
+        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(loc);
+        if (qsi.numRunningTasks > qsi.guaranteedCapacity) {
+          // all queues from here onwards are running over cap
+          break;
+        }
+      }
+      // if some queue needs to reclaim cap, there must be at least one queue
+      // over cap. But check, just in case. 
+      if (loc == qsiForAssigningTasks.size()) {
+        LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill + 
+            " tasks but there is no queue over capacity.");
+        return;
+      }
+      // calculate how many total tasks are over cap
+      int tasksOverCap = 0;
+      for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
+        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
+        tasksOverCap += (qsi.numRunningTasks - qsi.guaranteedCapacity);
+      }
+      // now kill tasks from each queue
+      for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
+        QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
+        killTasksFromQueue(qsi, (int)Math.round(
+            ((double)(qsi.numRunningTasks - qsi.guaranteedCapacity))*
+            tasksToKill/(double)tasksOverCap));
+      }
+    }
+
+    // kill 'tasksToKill' tasks from queue represented by qsi
+    private void killTasksFromQueue(QueueSchedulingInfo qsi, int tasksToKill) {
+      // we start killing as many tasks as possible from the jobs that started
+      // last. This way, we let long-running jobs complete faster.
+      int tasksKilled = 0;
+      JobInProgress jobs[] = scheduler.jobQueuesManager.
+        getRunningJobQueue(qsi.queueName).toArray(new JobInProgress[0]);
+      for (int i=jobs.length-1; i>=0; i--) {
+        if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) {
+          continue;
+        }
+        tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled);
+        if (tasksKilled >= tasksToKill) break;
+      }
+    }
+   
+    // return the TaskAttemptID of the running task, if any, that has made 
+    // the least progress.
+    TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) {
+      double leastProgress = 1;
+      TaskAttemptID tID = null;
+      for (Iterator<TaskAttemptID> it = 
+        tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
+        TaskAttemptID taskid = it.next();
+        TaskStatus status = tip.getTaskStatus(taskid);
+        if (status.getRunState() == TaskStatus.State.RUNNING) {
+          if (status.getProgress() < leastProgress) {
+            leastProgress = status.getProgress();
+            tID = taskid;
+          }
+        }
+      }
+      return tID;
+    }
+    
+    
+    /**
+     * Update individual QSI objects.
+     * We don't need exact information for all variables, just enough for us
+     * to make scheduling decisions. For example, we don't need an exact count
+     * of numRunningTasks. Once we count upto the grid capacity (gcSum), any
+     * number beyond that will make no difference.
+     * */
+    private synchronized void updateQSIObjects() {
+      // if # of slots have changed since last time, update. 
+      // First, compute whether the total number of TT slots have changed
+      int slotsDiff = getClusterCapacity()- numSlots;
+      numSlots += slotsDiff;
+      for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+        // compute new GCs and ACs, if TT slots have changed
+        if (slotsDiff != 0) {
+          qsi.guaranteedCapacity +=
+            (qsi.guaranteedCapacityPercent*slotsDiff/100);
+        }
+        qsi.numRunningTasks = 0;
+        qsi.numPendingTasks = 0;
+        for (String s: qsi.numRunningTasksByUser.keySet()) {
+          qsi.numRunningTasksByUser.put(s, 0);
+        }
+        // update stats on running jobs
+        for (JobInProgress j: 
+          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+          if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+            continue;
+          }
+          qsi.numRunningTasks += getRunningTasks(j);
+          Integer i = qsi.numRunningTasksByUser.get(j.getProfile().getUser());
+          qsi.numRunningTasksByUser.put(j.getProfile().getUser(), 
+              i+getRunningTasks(j));
+          qsi.numPendingTasks += getPendingTasks(j);
+          LOG.debug("updateQSI: job " + j.toString() + ": run(m) = " + 
+              j.runningMaps() + ", run(r) = " + j.runningReduces() + 
+              ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
+              j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
+              ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + 
+              j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks 
+              + ", total(m) = " + j.numMapTasks + ", total(r) = " + 
+              j.numReduceTasks);
+          /* 
+           * it's fine walking down the entire list of running jobs - there
+           * probably will not be many, plus, we may need to go through the
+           * list to compute numRunningTasksByUser. If this is expensive, we
+           * can keep a list of running jobs per user. Then we only need to
+           * consider the first few jobs per user.
+           */ 
+        }
+        // update stats on waiting jobs
+        for (JobInProgress j: 
+          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+          // pending tasks
+          if (qsi.numPendingTasks > getClusterCapacity()) {
+            // that's plenty. no need for more computation
+            break;
+          }
+          qsi.numPendingTasks += getPendingTasks(j);
+        }
+      }
+    }
+
+    
+    void jobAdded(JobInProgress job) {
+      // update qsi 
+      QueueSchedulingInfo qsi = 
+        queueInfoMap.get(job.getProfile().getQueueName());
+      // qsi shouldn't be null
+      
+      // update user-specific info
+      Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
+      if (null == i) {
+        qsi.numJobsByUser.put(job.getProfile().getUser(), 1);
+        qsi.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
+      }
+      else {
+        i++;
+      }
+    }
+    void jobRemoved(JobInProgress job) {
+      // update qsi 
+      QueueSchedulingInfo qsi = 
+        queueInfoMap.get(job.getProfile().getQueueName());
+      // qsi shouldn't be null
+      
+      // update numJobsByUser
+      LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
+      Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
+      i--;
+      if (0 == i.intValue()) {
+        qsi.numJobsByUser.remove(job.getProfile().getUser());
+        qsi.numRunningTasksByUser.remove(job.getProfile().getUser());
+        LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
+      }
+      else {
+        LOG.debug("User still has jobs, number of users = " + qsi.numJobsByUser.size());
+      }
+    }
+
+    // called when a task is allocated to queue represented by qsi. 
+    // update our info about reclaimed resources
+    private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
+      // if we needed to reclaim resources, we have reclaimed one
+      if (qsi.reclaimList.isEmpty()) {
+        return;
+      }
+      ReclaimedResource res = qsi.reclaimList.getFirst();
+      res.currentAmount--;
+      if (0 == res.currentAmount) {
+        // move this resource to the expiry list
+        ReclaimedResource r = qsi.reclaimList.remove();
+        qsi.reclaimExpireList.add(r);
+      }
+    }
+
+    private synchronized void updateCollectionOfQSIs() {
+      Collections.sort(qsiForAssigningTasks, queueComparator);
+    }
+
+
+    private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+      // what is our current capacity? It's GC if we're running below GC. 
+      // If we're running over GC, then its #running plus 1 (which is the 
+      // extra slot we're getting). 
+      int currentCapacity;
+      if (qsi.numRunningTasks < qsi.guaranteedCapacity) {
+        currentCapacity = qsi.guaranteedCapacity;
+      }
+      else {
+        currentCapacity = qsi.numRunningTasks+1;
+      }
+      int limit = Math.max((int)(Math.ceil((double)currentCapacity/
+          (double)qsi.numJobsByUser.size())), 
+          (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
+      if (qsi.numRunningTasksByUser.get(
+          j.getProfile().getUser()) >= limit) {
+        LOG.debug("User " + j.getProfile().getUser() + 
+            " is over limit, num running tasks = " + 
+            qsi.numRunningTasksByUser.get(j.getProfile().getUser()) + 
+            ", limit = " + limit);
+        return true;
+      }
+      else {
+        return false;
+      }
+    }
+    
+    private Task getTaskFromQueue(TaskTrackerStatus taskTracker, 
+        QueueSchedulingInfo qsi) throws IOException {
+      Task t = null;
+      // keep track of users over limit
+      Set<String> usersOverLimit = new HashSet<String>();
+      // look at running jobs first
+      for (JobInProgress j:
+        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+        // some jobs may be in the running queue but may have completed 
+        // and not yet have been removed from the running queue
+        if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+          continue;
+        }
+        // is this job's user over limit?
+        if (isUserOverLimit(j, qsi)) {
+          // user over limit. 
+          usersOverLimit.add(j.getProfile().getUser());
+          continue;
+        }
+        // We found a suitable job. Get task from it.
+        t = obtainNewTask(taskTracker, j);
+        if (t != null) {
+          LOG.debug("Got task from job " + 
+              j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+          return t;
+        }
+      }
+      
+      // if we're here, we found nothing in the running jobs. Time to 
+      // look at waiting jobs. Get first job of a user that is not over limit
+      for (JobInProgress j: 
+        scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+        // is this job's user over limit?
+        if (usersOverLimit.contains(j.getProfile().getUser())) {
+          // user over limit. 
+          continue;
+        }
+        // this job is a candidate for running. Initialize it, move it
+        // to run queue
+        j.initTasks();
+        scheduler.jobQueuesManager.jobUpdated(j);
+        // We found a suitable job. Get task from it.
+        t = obtainNewTask(taskTracker, j);
+        if (t != null) {
+          LOG.debug("Getting task from job " + 
+              j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+          return t;
+        }
+      }
+      
+      // if we're here, we haven't found anything. This could be because 
+      // there is nothing to run, or that the user limit for some user is 
+      // too strict, i.e., there's at least one user who doesn't have
+      // enough tasks to satisfy his limit. If it's the later case, look at 
+      // jobs without considering user limits, and get task from first 
+      // eligible job
+      if (usersOverLimit.size() > 0) {
+        for (JobInProgress j:
+          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+          if ((j.getStatus().getRunState() == JobStatus.RUNNING) && 
+              (usersOverLimit.contains(j.getProfile().getUser()))) {
+            t = obtainNewTask(taskTracker, j);
+            if (t != null) {
+              LOG.debug("Getting task from job " + 
+                  j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+              return t;
+            }
+          }
+        }
+        // look at waiting jobs the same way
+        for (JobInProgress j: 
+          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+          if (usersOverLimit.contains(j.getProfile().getUser())) {
+            j.initTasks();
+            scheduler.jobQueuesManager.jobUpdated(j);
+            t = obtainNewTask(taskTracker, j);
+            if (t != null) {
+              LOG.debug("Getting task from job " + 
+                  j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+              return t;
+            }
+          }
+        }
+      }
+      
+      return null;
+    }
+    
+    private List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException {
+      Task t = null;
+
+      /* 
+       * update all our QSI objects.
+       * This involves updating each qsi structure. This operation depends
+       * on the number of running jobs in a queue, and some waiting jobs. If it
+       * becomes expensive, do it once every few hearbeats only.
+       */ 
+      updateQSIObjects();
+      LOG.debug("After updating QSI objects:");
+      printQSIs();
+      /*
+       * sort list of qeues first, as we want queues that need the most to
+       * get first access. If this is expensive, sort every few heartbeats.
+       * We're only sorting a collection of queues - there shouldn't be many.
+       */
+      updateCollectionOfQSIs();
+      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        t = getTaskFromQueue(taskTracker, qsi);
+        if (t!= null) {
+          // we have a task. Update reclaimed resource info
+          updateReclaimedResources(qsi);
+          return Collections.singletonList(t);
+        }
+      }        
+
+      // nothing to give
+      return null;
+    }
+    
+    private void printQSIs() {
+      StringBuffer s = new StringBuffer();
+      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        Collection<JobInProgress> runJobs = 
+          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+        Collection<JobInProgress> waitJobs = 
+          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName);
+        s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
+            qsi.numRunningTasks + ", gc=" + qsi.guaranteedCapacity + 
+            ", wait=" + qsi.numPendingTasks + ", run jobs="+ runJobs.size() + 
+            ", wait jobs=" + waitJobs.size() + "*** ");
+      }
+      LOG.debug(s);
+    }
+
+  }
+
+  /**
+   * The scheduling algorithms for map tasks. 
+   */
+  private static class MapSchedulingMgr extends TaskSchedulingMgr {
+    MapSchedulingMgr(CapacityTaskScheduler dad) {
+      super(dad);
+      type = new String("map");
+    }
+    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    throws IOException {
+      ClusterStatus clusterStatus = 
+        scheduler.taskTrackerManager.getClusterStatus();
+      int numTaskTrackers = clusterStatus.getTaskTrackers();
+      return job.obtainNewMapTask(taskTracker, numTaskTrackers, 
+          scheduler.taskTrackerManager.getNumberOfUniqueHosts());
+    }
+    int getClusterCapacity() {
+      return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks();
+    }
+    int getRunningTasks(JobInProgress job) {
+      return job.runningMaps();
+    }
+    int getPendingTasks(JobInProgress job) {
+      return job.pendingMaps();
+    }
+    int killTasksFromJob(JobInProgress job, int tasksToKill) {
+      /*
+       * We'd like to kill tasks that ran the last, or that have made the
+       * least progress.
+       * Ideally, each job would have a list of tasks, sorted by start 
+       * time or progress. That's a lot of state to keep, however. 
+       * For now, we do something a little different. We first try and kill
+       * non-local tasks, as these can be run anywhere. For each TIP, we 
+       * kill the task that has made the least progress, if the TIP has
+       * more than one active task. 
+       * We then look at tasks in runningMapCache.
+       */
+      int tasksKilled = 0;
+      
+      /* 
+       * For non-local running maps, we 'cheat' a bit. We know that the set
+       * of non-local running maps has an insertion order such that tasks 
+       * that ran last are at the end. So we iterate through the set in 
+       * reverse. This is OK because even if the implementation changes, 
+       * we're still using generic set iteration and are no worse of.
+       */ 
+      TaskInProgress[] tips = 
+        job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]);
+      for (int i=tips.length-1; i>=0; i--) {
+        // pick the tast attempt that has progressed least
+        TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
+        if (null != tid) {
+          if (tips[i].killTask(tid, false)) {
+            if (++tasksKilled >= tasksToKill) {
+              return tasksKilled;
+            }
+          }
+        }
+      }
+      // now look at other running tasks
+      for (Set<TaskInProgress> s: job.getRunningMapCache().values()) {
+        for (TaskInProgress tip: s) {
+          TaskAttemptID tid = getRunningTaskWithLeastProgress(tip);
+          if (null != tid) {
+            if (tip.killTask(tid, false)) {
+              if (++tasksKilled >= tasksToKill) {
+                return tasksKilled;
+              }
+            }
+          }
+        }
+      }
+      return tasksKilled;
+    }
+
+  }
+
+  /**
+   * The scheduling algorithms for reduce tasks. 
+   */
+  private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
+    ReduceSchedulingMgr(CapacityTaskScheduler dad) {
+      super(dad);
+      type = new String("reduce");
+    }
+    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    throws IOException {
+      ClusterStatus clusterStatus = 
+        scheduler.taskTrackerManager.getClusterStatus();
+      int numTaskTrackers = clusterStatus.getTaskTrackers();
+      return job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+          scheduler.taskTrackerManager.getNumberOfUniqueHosts());
+    }
+    int getClusterCapacity() {
+      return scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+    }
+    int getRunningTasks(JobInProgress job) {
+      return job.runningReduces();
+    }
+    int getPendingTasks(JobInProgress job) {
+      return job.pendingReduces();
+    }
+    int killTasksFromJob(JobInProgress job, int tasksToKill) {
+      /* 
+       * For reduces, we 'cheat' a bit. We know that the set
+       * of running reduces has an insertion order such that tasks 
+       * that ran last are at the end. So we iterate through the set in 
+       * reverse. This is OK because even if the implementation changes, 
+       * we're still using generic set iteration and are no worse of.
+       */ 
+      int tasksKilled = 0;
+      TaskInProgress[] tips = 
+        job.getRunningReduces().toArray(new TaskInProgress[0]);
+      for (int i=tips.length-1; i>=0; i--) {
+        // pick the tast attempt that has progressed least
+        TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
+        if (null != tid) {
+          if (tips[i].killTask(tid, false)) {
+            if (++tasksKilled >= tasksToKill) {
+              return tasksKilled;
+            }
+          }
+        }
+      }
+      return tasksKilled;
+    }
+  }
+  
+  /** the scheduling mgrs for Map and Reduce tasks */ 
+  protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
+  protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
+  
+  /** name of the default queue. */ 
+  static final String DEFAULT_QUEUE_NAME = "default";
+  
+  /** how often does redistribution thread run (in msecs)*/
+  private static long RECLAIM_CAPACITY_INTERVAL;
+  /** we start killing tasks to reclaim capacity when we have so many 
+   * heartbeats left. */
+  private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
+
+  private static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
+  protected JobQueuesManager jobQueuesManager;
+  protected CapacitySchedulerConf rmConf;
+  /** whether scheduler has started or not */
+  private boolean started = false;
+  
+  /**
+   * Used to distribute/reclaim excess capacity among queues
+   */ 
+  class ReclaimCapacity implements Runnable {
+    public ReclaimCapacity() {
+    }
+    public void run() {
+      while (true) {
+        try {
+          Thread.sleep(RECLAIM_CAPACITY_INTERVAL);
+          if (stopReclaim) { 
+            break;
+          }
+          reclaimCapacity();
+        } catch (InterruptedException t) {
+          break;
+        } catch (Throwable t) {
+          LOG.error("Error in redistributing capacity:\n" +
+                    StringUtils.stringifyException(t));
+        }
+      }
+    }
+  }
+  private Thread reclaimCapacityThread = null;
+  /** variable to indicate that thread should stop */
+  private boolean stopReclaim = false;
+
+  /**
+   * A clock class - can be mocked out for testing.
+   */
+  static class Clock {
+    long getTime() {
+      return System.currentTimeMillis();
+    }
+  }
+  private Clock clock;
+
+  
+  public CapacityTaskScheduler() {
+    this(new Clock());
+  }
+  
+  // for testing
+  public CapacityTaskScheduler(Clock clock) {
+    this.jobQueuesManager = new JobQueuesManager(this);
+    this.clock = clock;
+  }
+  
+  /** mostly for testing purposes */
+  public void setResourceManagerConf(CapacitySchedulerConf conf) {
+    this.rmConf = conf;
+  }
+  
+  @Override
+  public synchronized void start() throws IOException {
+    if (started) return;
+    super.start();
+    RECLAIM_CAPACITY_INTERVAL = 
+      conf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
+    RECLAIM_CAPACITY_INTERVAL *= 1000;
+    // initialize our queues from the config settings
+    if (null == rmConf) {
+      rmConf = new CapacitySchedulerConf();
+    }
+    // read queue info from config file
+    Set<String> queues = taskTrackerManager.getQueueManager().getQueues();
+    float totalCapacity = 0.0f;
+    for (String queueName: queues) {
+      float gc = rmConf.getGuaranteedCapacity(queueName); 
+      totalCapacity += gc;
+      int ulMin = rmConf.getMinimumUserLimitPercent(queueName); 
+      long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName);
+      // reclaimTimeLimit is the time(in millisec) within which we need to
+      // reclaim capacity. 
+      // create queue scheduling objects for Map and Reduce
+      mapScheduler.add(new QueueSchedulingInfo(queueName, gc, 
+          ulMin, reclaimTimeLimit));
+      reduceScheduler.add(new QueueSchedulingInfo(queueName, gc, 
+          ulMin, reclaimTimeLimit));
+
+      // create the queues of job objects
+      boolean supportsPrio = rmConf.isPrioritySupported(queueName);
+      jobQueuesManager.createQueue(queueName, supportsPrio);
+    }
+    if (totalCapacity > 100.0) {
+      throw new IllegalArgumentException("Sum of queue capacities over 100% at "
+                                         + totalCapacity);
+    }
+    
+    // Sanity check: there should be at least one queue. 
+    if (0 == mapScheduler.getNumQueues()) {
+      throw new IllegalStateException("System has no queue configured");
+    }
+    
+    // check if there's a queue with the default name. If not, we quit.
+    if (!mapScheduler.isQueuePresent(DEFAULT_QUEUE_NAME)) {
+      throw new IllegalStateException("System has no default queue configured");
+    }
+    
+    // listen to job changes
+    taskTrackerManager.addJobInProgressListener(jobQueuesManager);
+
+    // start thread for redistributing capacity
+    this.reclaimCapacityThread = 
+      new Thread(new ReclaimCapacity(),"reclaimCapacity");
+    this.reclaimCapacityThread.start();
+    started = true;
+    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");
+  }
+  
+  @Override
+  public synchronized void terminate() throws IOException {
+    if (!started) return;
+    if (jobQueuesManager != null) {
+      taskTrackerManager.removeJobInProgressListener(
+          jobQueuesManager);
+    }
+    // tell the reclaim thread to stop
+    stopReclaim = true;
+    started = false;
+    super.terminate();
+  }
+  
+  @Override
+  public synchronized void setConf(Configuration conf) {
+    super.setConf(conf);
+  }
+
+  void reclaimCapacity() {
+    mapScheduler.reclaimCapacity();
+    reduceScheduler.reclaimCapacity();
+  }
+  
+  /**
+   * provided for the test classes
+   * lets you update the QSI objects and sorted collection
+   */ 
+  void updateQSIInfo() {
+    mapScheduler.updateQSIObjects();
+    mapScheduler.updateCollectionOfQSIs();
+    reduceScheduler.updateQSIObjects();
+    reduceScheduler.updateCollectionOfQSIs();
+  }
+  
+  /* 
+   * The grand plan for assigning a task. 
+   * First, decide whether a Map or Reduce task should be given to a TT 
+   * (if the TT can accept either). 
+   * Next, pick a queue. We only look at queues that need a slot. Among
+   * these, we first look at queues whose ac is less than gc (queues that 
+   * gave up capacity in the past). Next, we look at any other queue that
+   * needs a slot. 
+   * Next, pick a job in a queue. we pick the job at the front of the queue
+   * unless its user is over the user limit. 
+   * Finally, given a job, pick a task from the job. 
+   *  
+   */
+  @Override
+  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+      throws IOException {
+    
+    List<Task> tasks = null;
+    /* 
+     * If TT has Map and Reduce slot free, we need to figure out whether to
+     * give it a Map or Reduce task.
+     * Number of ways to do this. For now, base decision on how much is needed
+     * versus how much is used (default to Map, if equal).
+     */
+    LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() + 
+        ", run maps=" + taskTracker.countMapTasks() + ", max reds=" + 
+        taskTracker.getMaxReduceTasks() + ", run reds=" + 
+        taskTracker.countReduceTasks() + ", map cap=" + 
+        mapScheduler.getClusterCapacity() + ", red cap = " + 
+        reduceScheduler.getClusterCapacity());
+    int maxMapTasks = taskTracker.getMaxMapTasks();
+    int currentMapTasks = taskTracker.countMapTasks();
+    int maxReduceTasks = taskTracker.getMaxReduceTasks();
+    int currentReduceTasks = taskTracker.countReduceTasks();
+    if ((maxReduceTasks - currentReduceTasks) > 
+    (maxMapTasks - currentMapTasks)) {
+      tasks = reduceScheduler.assignTasks(taskTracker);
+      // if we didn't get any, look at map tasks, if TT has space
+      if ((null == tasks) && (maxMapTasks > currentMapTasks)) {
+        tasks = mapScheduler.assignTasks(taskTracker);
+      }
+    }
+    else {
+      tasks = mapScheduler.assignTasks(taskTracker);
+      // if we didn't get any, look at red tasks, if TT has space
+      if ((null == tasks) && (maxReduceTasks > currentReduceTasks)) {
+        tasks = reduceScheduler.assignTasks(taskTracker);
+      }
+    }
+    return tasks;
+  }
+  
+  // called when a job is added
+  synchronized void jobAdded(JobInProgress job) {
+    // let our map and reduce schedulers know this, so they can update 
+    // user-specific info
+    mapScheduler.jobAdded(job);
+    reduceScheduler.jobAdded(job);
+  }
+
+  // called when a job is removed
+  synchronized void jobRemoved(JobInProgress job) {
+    // let our map and reduce schedulers know this, so they can update 
+    // user-specific info
+    mapScheduler.jobRemoved(job);
+    reduceScheduler.jobRemoved(job);
+  }
+  
+}
+

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Thu Sep 11 11:49:22 2008
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link JobInProgressListener} that maintains the jobs being managed in
+ * one or more queues. 
+ */
+class JobQueuesManager extends JobInProgressListener {
+
+  /* 
+   * If a queue supports priorities, waiting jobs must be 
+   * sorted on priorities, and then on their start times (technically, 
+   * their insertion time.  
+   * If a queue doesn't support priorities, waiting jobs are
+   * sorted based on their start time.  
+   * Running jobs are not sorted. A job that started running earlier
+   * is ahead in the queue, so insertion should be at the tail.
+   */
+  
+  // comparator for jobs in queues that support priorities
+  private static final Comparator<JobInProgress> PRIORITY_JOB_COMPARATOR
+    = new Comparator<JobInProgress>() {
+    public int compare(JobInProgress o1, JobInProgress o2) {
+      // Look at priority.
+      int res = o1.getPriority().compareTo(o2.getPriority());
+      if (res == 0) {
+        // the job that started earlier wins
+        if (o1.getStartTime() < o2.getStartTime()) {
+          res = -1;
+        } else {
+          res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
+        }
+      }
+      if (res == 0) {
+        res = o1.getJobID().compareTo(o2.getJobID());
+      }
+      return res;
+    }
+  };
+  // comparator for jobs in queues that don't support priorities
+  private static final Comparator<JobInProgress> STARTTIME_JOB_COMPARATOR
+    = new Comparator<JobInProgress>() {
+    public int compare(JobInProgress o1, JobInProgress o2) {
+      // the job that started earlier wins
+      if (o1.getStartTime() < o2.getStartTime()) {
+        return -1;
+      } else {
+        return (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
+      }
+    }
+  };
+  
+  // class to store queue info
+  private static class QueueInfo {
+
+    // whether the queue supports priorities
+    boolean supportsPriorities;
+    // maintain separate collections of running & waiting jobs. This we do 
+    // mainly because when a new job is added, it cannot superceede a running 
+    // job, even though the latter may be a lower priority. If this is ever
+    // changed, we may get by with one collection. 
+    Collection<JobInProgress> waitingJobs;
+    Collection<JobInProgress> runningJobs;
+    
+    QueueInfo(boolean prio) {
+      this.supportsPriorities = prio;
+      if (supportsPriorities) {
+        this.waitingJobs = new TreeSet<JobInProgress>(PRIORITY_JOB_COMPARATOR);
+      }
+      else {
+        this.waitingJobs = new TreeSet<JobInProgress>(STARTTIME_JOB_COMPARATOR);
+      }
+      this.runningJobs = new LinkedList<JobInProgress>();
+    }
+
+    /**
+     * we need to delete an object from our TreeSet based on referential
+     * equality, rather than value equality that the TreeSet uses. 
+     * Another way to do this is to extend the TreeSet and override remove().
+     */
+    static private boolean removeOb(Collection<JobInProgress> c, Object o) {
+      Iterator<JobInProgress> i = c.iterator();
+      while (i.hasNext()) {
+          if (i.next() == o) {
+              i.remove();
+              return true;
+          }
+      }
+      return false;
+    }
+    
+  }
+  
+  // we maintain a hashmap of queue-names to queue info
+  private Map<String, QueueInfo> jobQueues = 
+    new HashMap<String, QueueInfo>();
+  private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
+  private CapacityTaskScheduler scheduler;
+
+  
+  JobQueuesManager(CapacityTaskScheduler s) {
+    this.scheduler = s;
+  }
+  
+  /**
+   * create an empty queue with the default comparator
+   * @param queueName The name of the queue
+   * @param supportsPriotities whether the queue supports priorities
+   */
+  public void createQueue(String queueName, boolean supportsPriotities) {
+    jobQueues.put(queueName, new QueueInfo(supportsPriotities));
+  }
+  
+  /**
+   * Returns the queue of running jobs associated with the name
+   */
+  public Collection<JobInProgress> getRunningJobQueue(String queueName) {
+    return jobQueues.get(queueName).runningJobs;
+  }
+  
+  /**
+   * Returns the queue of waiting jobs associated with the name
+   */
+  public Collection<JobInProgress> getWaitingJobQueue(String queueName) {
+    return jobQueues.get(queueName).waitingJobs;
+  }
+  
+  @Override
+  public void jobAdded(JobInProgress job) {
+    LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
+    // add job to the right queue
+    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+    if (null == qi) {
+      // job was submitted to a queue we're not aware of
+      LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
+          " specified for job" + job.getProfile().getJobID() + 
+          ". Ignoring job.");
+      return;
+    }
+    // add job to waiting queue. It will end up in the right place, 
+    // based on priority. 
+    // We use our own version of removing objects based on referential
+    // equality, since the 'job' object has already been changed. 
+    qi.waitingJobs.add(job);
+    // let scheduler know. 
+    scheduler.jobAdded(job);
+  }
+
+  @Override
+  public void jobRemoved(JobInProgress job) {
+    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+    if (null == qi) {
+      // can't find queue for job. Shouldn't happen. 
+      LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
+          " when removing job " + job.getProfile().getJobID());
+      return;
+    }
+    // job could be in running or waiting queue
+    if (!qi.runningJobs.remove(job)) {
+      QueueInfo.removeOb(qi.waitingJobs, job);
+    }
+    // let scheduler know
+    scheduler.jobRemoved(job);
+  }
+  
+  @Override
+  public void jobUpdated(JobInProgress job) {
+    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+    if (null == qi) {
+      // can't find queue for job. Shouldn't happen. 
+      LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
+          " when updating job " + job.getProfile().getJobID());
+      return;
+    }
+    // this is called when a job's priority or state is changed.
+    // since we don't know the job's previous state, we need to 
+    // find out in which queue it was earlier, and then place it in the
+    // right queue.
+    Collection<JobInProgress> dest = (job.getStatus().getRunState() == 
+      JobStatus.PREP)? qi.waitingJobs: qi.runningJobs;
+    // We use our own version of removing objects based on referential
+    // equality, since the 'job' object has already been changed. 
+    if (!QueueInfo.removeOb(qi.waitingJobs, job)) {
+      qi.runningJobs.remove(job);
+    }
+    dest.add(job);
+  }
+  
+}