You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/09/11 20:49:26 UTC
svn commit: r694415 [1/2] - in /hadoop/core/trunk: conf/ src/contrib/
src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/
src/contrib/capacity-scheduler/src/java/
src/contrib/capacity-scheduler/src/java/org/ src/contrib/capacity-schedul...
Author: omalley
Date: Thu Sep 11 11:49:22 2008
New Revision: 694415
URL: http://svn.apache.org/viewvc?rev=694415&view=rev
Log:
HADOOP-3445. Add capacity scheduler that provides guaranteed capacities to
queues as a percentage of the cluster. (Vivek Ratan via omalley)
Added:
hadoop/core/trunk/conf/capacity-scheduler.xml.template
hadoop/core/trunk/src/contrib/capacity-scheduler/
hadoop/core/trunk/src/contrib/capacity-scheduler/README
hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml
hadoop/core/trunk/src/contrib/capacity-scheduler/src/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
Removed:
hadoop/core/trunk/conf/resource-manager-conf.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java
Modified:
hadoop/core/trunk/conf/ (props changed)
hadoop/core/trunk/src/contrib/build.xml
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
Propchange: hadoop/core/trunk/conf/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Sep 11 11:49:22 2008
@@ -3,3 +3,4 @@
slaves
masters
hadoop-env.sh
+capacity-scheduler.xml
Added: hadoop/core/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/conf/capacity-scheduler.xml.template (added)
+++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Thu Sep 11 11:49:22 2008
@@ -0,0 +1,50 @@
+<?xml version="1.0"?>
+
+<!-- This is the configuration file for the resource manager in Hadoop. -->
+<!-- You can configure various scheduling parameters related to queues. -->
+<!-- The properties for a queue follow a naming convention,such as, -->
+<!-- mapred.capacity-scheduler.queue.<queue-name>.property-name. -->
+
+<configuration>
+
+ <property>
+ <name>mapred.capacity-scheduler.queue.default.guaranteed-capacity</name>
+ <value>100</value>
+ <description>Percentage of the number of slots in the cluster that are
+ guaranteed to be available for jobs in this queue.
+ </description>
+ </property>
+
+ <property>
+ <name>mapred.capacity-scheduler.queue.default.reclaim-time-limit</name>
+ <value>300</value>
+ <description>The amount of time, in seconds, before which
+ resources distributed to other queues will be reclaimed.
+ </description>
+ </property>
+
+ <property>
+ <name>mapred.capacity-scheduler.queue.default.supports-priority</name>
+ <value>false</value>
+ <description>If true, priorities of jobs will be taken into
+ account in scheduling decisions.
+ </description>
+ </property>
+
+ <property>
+ <name>mapred.capacity-scheduler.queue.default.minimum-user-limit-percent</name>
+ <value>100</value>
+ <description> Each queue enforces a limit on the percentage of resources
+ allocated to a user at any given time, if there is competition for them.
+ This user limit can vary between a minimum and maximum value. The former
+ depends on the number of users who have submitted jobs, and the latter is
+ set to this property value. For example, suppose the value of this
+ property is 25. If two users have submitted jobs to a queue, no single
+ user can use more than 50% of the queue resources. If a third user submits
+ a job, no single user can use more than 33% of the queue resources. With 4
+ or more users, no user can use more than 25% of the queue's resources. A
+ value of 100 implies no user limits are imposed.
+ </description>
+ </property>
+
+</configuration>
Modified: hadoop/core/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build.xml?rev=694415&r1=694414&r2=694415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/build.xml (original)
+++ hadoop/core/trunk/src/contrib/build.xml Thu Sep 11 11:49:22 2008
@@ -48,6 +48,7 @@
<subant target="test">
<fileset dir="." includes="streaming/build.xml"/>
<fileset dir="." includes="fairscheduler/build.xml"/>
+ <fileset dir="." includes="capacity-scheduler/build.xml"/>
</subant>
</target>
Added: hadoop/core/trunk/src/contrib/capacity-scheduler/README
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/README?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/README (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/README Thu Sep 11 11:49:22 2008
@@ -0,0 +1,135 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License. You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements a scheduler for Map-Reduce jobs, called Capacity
+Task Scheduler (or just Capacity Scheduler), which provides a way to share
+large clusters. The scheduler provides the following features (which are
+described in detail in HADOOP-3421):
+
+* Support for queues, where a job is submitted to a queue.
+* Queues are guaranteed a fraction of the capacity of the grid (their
+ 'guaranteed capacity') in the sense that a certain capacity of resources
+ will be at their disposal. All jobs submitted to the queues of an Org will
+ have access to the capacity guaranteed to the Org.
+* Free resources can be allocated to any queue beyond its guaranteed capacity.
+ These excess allocated resources can be reclaimed and made available to
+ another queue in order to meet its capacity guarantee.
+* The scheduler guarantees that excess resources taken from a queue will be
+ restored to it within N minutes of its need for them.
+* Queues optionally support job priorities (disabled by default).
+* Within a queue, jobs with higher priority will have access to the queue's
+ resources before jobs with lower priority. However, once a job is running, it
+ will not be preempted for a higher priority job.
+* In order to prevent one or more users from monopolizing its resources, each
+ queue enforces a limit on the percentage of resources allocated to a user at
+ any given time, if there is competition for them.
+* Support for memory-intensive jobs, wherein a job can optionally specify
+ higher memory-requirements than the default, and the tasks of the job will
+ only be run on TaskTrackers that have enough memory to spare.
+
+Whenever a TaskTracker is free, the Capacity Scheduler first picks a queue
+that needs to reclaim any resources the earliest. If no such queue is found,
+it then picks a queue which has most free space (whose ratio of # of running
+slots to guaranteed capacity is the lowest).
+
+--------------------------------------------------------------------------------
+
+BUILDING:
+
+In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
+
+--------------------------------------------------------------------------------
+
+INSTALLING:
+
+To run the capacity scheduler in your Hadoop installation, you need to put it
+on the CLASSPATH. The easiest way is to copy the
+hadoop-*-capacity-scheduler.jar from
+HADOOP_HOME/build/contrib/capacity-scheduler to HADOOP_HOME/lib. Alternatively
+you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
+
+You will also need to set the following property in the Hadoop config file
+(conf/hadoop-site.xml) to have Hadoop use the capacity scheduler:
+
+<property>
+ <name>mapred.jobtracker.taskScheduler</name>
+ <value>org.apache.hadoop.mapred.CapacityTaskScheduler</value>
+</property>
+
+--------------------------------------------------------------------------------
+
+CONFIGURATION:
+
+The following properties can be set in hadoop-site.xml to configure the
+scheduler:
+
+mapred.capacity-scheduler.reclaimCapacity.interval:
+ The capacity scheduler checks, every 'interval' seconds, whether any
+ capacity needs to be reclaimed. The default value is 5 seconds.
+
+The scheduling information for queues is maintained in a configuration file
+called 'capacity-scheduler.xml'. Note that the queue names are set in
+hadoop-site.xml. capacity-scheduler.xml sets the scheduling properties
+for each queue. See that file for configuration details, but the following
+are the configuration options for each queue:
+
+mapred.capacity-scheduler.queue.<queue-name>.guaranteed-capacity
+ Percentage of the number of slots in the cluster that are
+ guaranteed to be available for jobs in this queue.
+ The sum of guaranteed capacities for all queues should be less than or
+ equal 100.
+
+mapred.capacity-scheduler.queue.<queue-name>.reclaim-time-limit
+ The amount of time, in seconds, before which resources distributed to other
+ queues will be reclaimed.
+
+mapred.capacity-scheduler.queue.<queue-name>.supports-priority
+ If true, priorities of jobs will be taken into account in scheduling
+ decisions.
+
+mapred.capacity-scheduler.queue.<queue-name>.minimum-user-limit-percent
+ Each queue enforces a limit on the percentage of resources
+ allocated to a user at any given time, if there is competition for them.
+ This user limit can vary between a minimum and maximum value. The former
+ depends on the number of users who have submitted jobs, and the latter is
+ set to this property value. For example, suppose the value of this
+ property is 25. If two users have submitted jobs to a queue, no single
+ user can use more than 50% of the queue resources. If a third user submits
+ a job, no single user can use more than 33% of the queue resources. With 4
+ or more users, no user can use more than 25% of the queue's resources. A
+ value of 100 implies no user limits are imposed.
+
+
+--------------------------------------------------------------------------------
+
+IMPLEMENTATION:
+
+When a TaskTracker is free, the capacity scheduler does the following (note
+that many of these steps can be, and will be, enhanced over time to provide
+better algorithms):
+1. Decide whether to giev it a Map or Reduce task, depending on how many tasks
+the TT is already running of that type, with respect to the maximum taks it
+can run.
+2. The scheduler then picks a queue. Queues that need to reclaim capacity
+sooner, come before queues that don't. For queues that don't, they're ordered
+by a ratio of (# of running tasks)/Guaranteed capacity, which indicates how
+much 'free space' the queue has, or how much it is over capacity.
+3. A job is picked in the queue based on its state (running jobs are picked
+first), its priority (if the queue supports priorities) or its submission
+time, and whether the job's user is under or over limit.
+4. A task is picked from the job in the same way it always has.
+
+Periodically, a thread checks each queue to see if it needs to reclaim any
+capacity. Queues that are running below capacity and that have tasks waiting,
+need to reclaim capacity within a certain perdiod of time. If a queue hasn't
+received enough tasks in a certain amount of time, tasks will be killed from
+queues that are running over capacity.
+
+--------------------------------------------------------------------------------
Added: hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml Thu Sep 11 11:49:22 2008
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="capacity-scheduler" default="jar">
+
+ <import file="../build-contrib.xml"/>
+
+</project>
Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Thu Sep 11 11:49:22 2008
@@ -0,0 +1,222 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Class providing access to resource manager configuration.
+ *
+ * Resource manager configuration involves setting up queues, and defining
+ * various properties for the queues. These are typically read from a file
+ * called resource-manager-conf.xml that must be in the classpath of the
+ * application. The class provides APIs to get/set and reload the
+ * configuration for the queues.
+ */
+class CapacitySchedulerConf {
+
+ /** Default file name from which the resource manager configuration is read. */
+ public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
+
+ /** Default value for guaranteed capacity of maps (as percentage).
+ * The default value is set to 100, to represent the entire queue.
+ */
+ public static final float DEFAULT_GUARANTEED_CAPACITY = 100;
+
+ /** Default value for reclaiming redistributed resources.
+ * The default value is set to <code>300</code>.
+ */
+ public static final int DEFAULT_RECLAIM_TIME_LIMIT = 300;
+
+ /** Default value for minimum resource limit per user per queue, as a
+ * percentage.
+ * The default value is set to <code>100</code>, the idea
+ * being that the default is suitable for organizations that do not
+ * require setting up any queues.
+ */
+ public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
+
+ private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX =
+ "mapred.capacity-scheduler.queue.";
+
+ private Configuration rmConf;
+
+ /**
+ * Create a new ResourceManagerConf.
+ * This method reads from the default configuration file mentioned in
+ * {@link RM_CONF_FILE}, that must be present in the classpath of the
+ * application.
+ */
+ public CapacitySchedulerConf() {
+ rmConf = new Configuration(false);
+ rmConf.addResource(SCHEDULER_CONF_FILE);
+ }
+
+ /**
+ * Create a new ResourceManagerConf reading the specified configuration
+ * file.
+ *
+ * @param configFile {@link Path} to the configuration file containing
+ * the resource manager configuration.
+ */
+ public CapacitySchedulerConf(Path configFile) {
+ rmConf = new Configuration(false);
+ rmConf.addResource(configFile);
+ }
+
+ /**
+ * Get the guaranteed percentage of the cluster for the specified queue.
+ *
+ * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY} if
+ * no value is specified in the configuration for this queue. If the queue
+ * name is unknown, this method throws a {@link IllegalArgumentException}
+ * @param queue name of the queue
+ * @return guaranteed percent of the cluster for the queue.
+ */
+ public float getGuaranteedCapacity(String queue) {
+ checkQueue(queue);
+ float result = rmConf.getFloat(toFullPropertyName(queue,
+ "guaranteed-capacity"),
+ DEFAULT_GUARANTEED_CAPACITY);
+ if (result < 0.0 || result > 100.0) {
+ throw new IllegalArgumentException("Illegal capacity for queue " + queue +
+ " of " + result);
+ }
+ return result;
+ }
+
+ /**
+ * Get the amount of time before which redistributed resources must be
+ * reclaimed for the specified queue.
+ *
+ * The resource manager distributes spare capacity from a free queue
+ * to ones which are in need for more resources. However, if a job
+ * submitted to the first queue requires back the resources, they must
+ * be reclaimed within the specified configuration time limit.
+ *
+ * This method defaults to {@link #DEFAULT_RECLAIM_TIME_LIMIT} if
+ * no value is specified in the configuration for this queue. If the queue
+ * name is unknown, this method throws a {@link IllegalArgumentException}
+ * @param queue name of the queue
+ * @return reclaim time limit for this queue.
+ */
+ public int getReclaimTimeLimit(String queue) {
+ checkQueue(queue);
+ return rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"),
+ DEFAULT_RECLAIM_TIME_LIMIT);
+ }
+
+ /**
+ * Set the amount of time before which redistributed resources must be
+ * reclaimed for the specified queue.
+ * @param queue Name of the queue
+ * @param value Amount of time before which the redistributed resources
+ * must be retained.
+ */
+ public void setReclaimTimeLimit(String queue, int value) {
+ checkQueue(queue);
+ rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
+ }
+
+ /**
+ * Get whether priority is supported for this queue.
+ *
+ * If this value is false, then job priorities will be ignored in
+ * scheduling decisions. This method defaults to <code>false</code> if
+ * the property is not configured for this queue. If the queue name is
+ * unknown, this method throws a {@link IllegalArgumentException}
+ * @param queue name of the queue
+ * @return Whether this queue supports priority or not.
+ */
+ public boolean isPrioritySupported(String queue) {
+ checkQueue(queue);
+ return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
+ false);
+ }
+
+ /**
+ * Set whether priority is supported for this queue.
+ *
+ * If the queue name is unknown, this method throws a
+ * {@link IllegalArgumentException}
+ * @param queue name of the queue
+ * @param value true, if the queue must support priorities, false otherwise.
+ */
+ public void setPrioritySupported(String queue, boolean value) {
+ checkQueue(queue);
+ rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+ }
+
+ /**
+ * Get the minimum limit of resources for any user submitting jobs in
+ * this queue, in percentage.
+ *
+ * This method defaults to {@link #DEFAULT_MIN_USER_LIMIT_PERCENT} if
+ * no value is specified in the configuration for this queue. If the queue
+ * name is unknown, this method throws a {@link IllegalArgumentException}
+ * @param queue name of the queue
+ * @return minimum limit of resources, in percentage, that will be
+ * available for a user.
+ *
+ */
+ public int getMinimumUserLimitPercent(String queue) {
+ checkQueue(queue);
+ return rmConf.getInt(toFullPropertyName(queue,
+ "minimum-user-limit-percent"),
+ DEFAULT_MIN_USER_LIMIT_PERCENT);
+ }
+
+ /**
+ * Set the minimum limit of resources for any user submitting jobs in
+ * this queue, in percentage.
+ *
+ * If the queue name is unknown, this method throws a
+ * {@link IllegalArgumentException}
+ * @param queue name of the queue
+ * @param value minimum limit of resources for any user submitting jobs
+ * in this queue
+ */
+ public void setMinimumUserLimitPercent(String queue, int value) {
+ checkQueue(queue);
+ rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"),
+ value);
+ }
+
+ /**
+ * Reload configuration by clearing the information read from the
+ * underlying configuration file.
+ */
+ public synchronized void reloadConfiguration() {
+ rmConf.reloadConfiguration();
+ }
+
+ private synchronized void checkQueue(String queue) {
+ /*if (queues == null) {
+ queues = getQueues();
+ }
+ if (!queues.contains(queue)) {
+ throw new IllegalArgumentException("Queue " + queue + " is undefined.");
+ }*/
+ }
+
+ private static final String toFullPropertyName(String queue,
+ String property) {
+ return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
+ }
+}
Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Sep 11 11:49:22 2008
@@ -0,0 +1,1068 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
+ * and provides a HOD-less way to share large clusters. This scheduler
+ * provides the following features:
+ * * support for queues, where a job is submitted to a queue.
+ * * Queues are guaranteed a fraction of the capacity of the grid (their
+ * 'guaranteed capacity') in the sense that a certain capacity of resources
+ * will be at their disposal. All jobs submitted to the queues of an Org
+ * will have access to the capacity guaranteed to the Org.
+ * * Free resources can be allocated to any queue beyond its guaranteed
+ * capacity. These excess allocated resources can be reclaimed and made
+ * available to another queue in order to meet its capacity guarantee.
+ * * The scheduler guarantees that excess resources taken from a queue will
+ * be restored to it within N minutes of its need for them.
+ * * Queues optionally support job priorities (disabled by default).
+ * * Within a queue, jobs with higher priority will have access to the
+ * queue's resources before jobs with lower priority. However, once a job
+ * is running, it will not be preempted for a higher priority job.
+ * * In order to prevent one or more users from monopolizing its resources,
+ * each queue enforces a limit on the percentage of resources allocated to a
+ * user at any given time, if there is competition for them.
+ *
+ */
+class CapacityTaskScheduler extends TaskScheduler {
+
+ /**
+ * For keeping track of reclaimed capacity.
+ * Whenever slots need to be reclaimed, we create one of these objects.
+ * As the queue gets slots, the amount to reclaim gets decremented. if
+ * we haven't reclaimed enough within a certain time, we need to kill
+ * tasks. This object 'expires' either if all resources are reclaimed
+ * before the deadline, or the deadline passes .
+ */
+ private static class ReclaimedResource {
+ // how much resource to reclaim
+ public int originalAmount;
+ // how much is to be reclaimed currently
+ public int currentAmount;
+ // the time, in millisecs, when this object expires.
+ // This time is equal to the time when the object was created, plus
+ // the reclaim-time SLA for the queue.
+ public long whenToExpire;
+ // we also keep track of when to kill tasks, im millisecs. This is a
+ // fraction of 'whenToExpire', but we store it here so we don't
+ // recompute it every time.
+ public long whenToKill;
+ // whether tasks have been killed for this resource
+ boolean tasksKilled;
+
+ public ReclaimedResource(int amount, long expiryTime,
+ long whenToKill) {
+ this.originalAmount = amount;
+ this.currentAmount = amount;
+ this.whenToExpire = expiryTime;
+ this.whenToKill = whenToKill;
+ this.tasksKilled = false;
+ }
+ }
+
+ /**
+ * This class keeps track of scheduling info for each queue for either
+ * Map or Reduce tasks. .
+ * This scheduling information is used by the JT to decide how to allocate
+ * tasks, redistribute capacity, etc.
+ */
+ private static class QueueSchedulingInfo {
+ String queueName;
+
+ /** guaranteed capacity(%) is set at config time */
+ float guaranteedCapacityPercent = 0;
+ /**
+ * the actual gc, which depends on how many slots are available
+ * in the cluster at any given time.
+ */
+ int guaranteedCapacity = 0;
+
+ /**
+ * we also keep track of how many tasks are running for all jobs in
+ * the queue, and how many overall tasks there are. This info is
+ * available for each job, but keeping a sum makes our algos faster.
+ */
+ // number of running tasks
+ int numRunningTasks = 0;
+ // number of pending tasks
+ int numPendingTasks = 0;
+
+ /**
+ * to handle user limits, we need to know how many users have jobs in
+ * the queue.
+ */
+ Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
+ /** for each user, we need to keep track of number of running tasks */
+ Map<String, Integer> numRunningTasksByUser =
+ new HashMap<String, Integer>();
+
+ /** min value of user limit (same for all users) */
+ int ulMin;
+
+ /**
+ * We need to keep track of resources to reclaim.
+ * Whenever a queue is under capacity and has tasks pending, we offer it
+ * an SLA that gives it free slots equal to or greater than the gap in
+ * its capacity, within a period of time (reclaimTime).
+ * To do this, we periodically check if queues need to reclaim capacity.
+ * If they do, we create a ResourceReclaim object. We also periodically
+ * check if a queue has received enough free slots within, say, 80% of
+ * its reclaimTime. If not, we kill enough tasks to make up the
+ * difference.
+ * We keep two queues of ResourceReclaim objects. when an object is
+ * created, it is placed in one queue. Once we kill tasks to recover
+ * resources for that object, it is placed in an expiry queue. we need
+ * to do this to prevent creating spurious ResourceReclaim objects. We
+ * keep a count of total resources that are being reclaimed. Thsi count
+ * is decremented when an object expires.
+ */
+
+ /**
+ * reclaim time limit (in msec). This time represents the SLA we offer
+ * a queue - a queue gets back any lost capacity withing this period
+ * of time.
+ */
+ long reclaimTime;
+ /**
+ * the list of resources to reclaim. This list is always sorted so that
+ * resources that need to be reclaimed sooner occur earlier in the list.
+ */
+ LinkedList<ReclaimedResource> reclaimList =
+ new LinkedList<ReclaimedResource>();
+ /**
+ * the list of resources to expire. This list is always sorted so that
+ * resources that need to be expired sooner occur earlier in the list.
+ */
+ LinkedList<ReclaimedResource> reclaimExpireList =
+ new LinkedList<ReclaimedResource>();
+ /**
+ * sum of all resources that are being reclaimed.
+ * We keep this to prevent unnecessary ReclaimResource objects from being
+ * created.
+ */
+ int numReclaimedResources = 0;
+
+ public QueueSchedulingInfo(String queueName, float guaranteedCapacity,
+ int ulMin, long reclaimTime) {
+ this.queueName = new String(queueName);
+ this.guaranteedCapacityPercent = guaranteedCapacity;
+ this.ulMin = ulMin;
+ this.reclaimTime = reclaimTime;
+ }
+ }
+
+ /**
+ * This class handles the scheduling algorithms.
+ * The algos are the same for both Map and Reduce tasks.
+ * There may be slight variations later, in which case we can make this
+ * an abstract base class and have derived classes for Map and Reduce.
+ */
+ private static abstract class TaskSchedulingMgr {
+
+ /** quick way to get qsi object given a queue name */
+ private Map<String, QueueSchedulingInfo> queueInfoMap =
+ new HashMap<String, QueueSchedulingInfo>();
+ /** we keep track of the number of map or reduce slots we saw last */
+ private int numSlots = 0;
+ /** our enclosing TaskScheduler object */
+ protected CapacityTaskScheduler scheduler;
+ // for debugging
+ protected String type = null;
+
+ abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
+ JobInProgress job) throws IOException;
+ abstract int getClusterCapacity();
+ abstract int getRunningTasks(JobInProgress job);
+ abstract int getPendingTasks(JobInProgress job);
+ abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
+
+ /**
+ * List of QSIs for assigning tasks.
+ * This list is ordered such that queues that need to reclaim capacity
+ * sooner, come before queues that don't. For queues that don't, they're
+ * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which
+ * indicates how much 'free space' the queue has, or how much it is over
+ * capacity. This ordered list is iterated over, when assigning tasks.
+ */
+ private List<QueueSchedulingInfo> qsiForAssigningTasks =
+ new ArrayList<QueueSchedulingInfo>();
+ /** comparator to sort queues */
+ private static final class QueueComparator
+ implements Comparator<QueueSchedulingInfo> {
+ public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
+ // if one queue needs to reclaim something and the other one doesn't,
+ // the former is first
+ if ((0 == q1.reclaimList.size()) && (0 != q2.reclaimList.size())) {
+ return 1;
+ }
+ else if ((0 != q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
+ return -1;
+ }
+ else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
+ // neither needs to reclaim. look at how much capacity they've filled
+ double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
+ double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
+ if (r1<r2) return -1;
+ else if (r1>r2) return 1;
+ else return 0;
+ }
+ else {
+ // both have to reclaim. Look at which one needs to reclaim earlier
+ long t1 = q1.reclaimList.get(0).whenToKill;
+ long t2 = q2.reclaimList.get(0).whenToKill;
+ if (t1<t2) return -1;
+ else if (t1>t2) return 1;
+ else return 0;
+ }
+ }
+ }
+ private final static QueueComparator queueComparator = new QueueComparator();
+
+
+ TaskSchedulingMgr(CapacityTaskScheduler sched) {
+ scheduler = sched;
+ }
+
+ private void add(QueueSchedulingInfo qsi) {
+ queueInfoMap.put(qsi.queueName, qsi);
+ qsiForAssigningTasks.add(qsi);
+ }
+ private int getNumQueues() {
+ return queueInfoMap.size();
+ }
+ private boolean isQueuePresent(String queueName) {
+ return queueInfoMap.containsKey(queueName);
+ }
+
+ /**
+ * Periodically, we walk through our queues to do the following:
+ * a. Check if a queue needs to reclaim any resources within a period
+ * of time (because it's running below capacity and more tasks are
+ * waiting)
+ * b. Check if a queue hasn't received enough of the resources it needed
+ * to be reclaimed and thus tasks need to be killed.
+ */
+ private synchronized void reclaimCapacity() {
+ int tasksToKill = 0;
+ // with only one queue, there's nothing to do
+ if (queueInfoMap.size() < 2) {
+ return;
+ }
+ QueueSchedulingInfo lastQsi =
+ qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
+ long currentTime = scheduler.clock.getTime();
+ for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+ // is there any resource that needs to be reclaimed?
+ if ((!qsi.reclaimList.isEmpty()) &&
+ (qsi.reclaimList.getFirst().whenToKill <
+ currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) {
+ // make a note of how many tasks to kill to claim resources
+ tasksToKill += qsi.reclaimList.getFirst().currentAmount;
+ // move this to expiry list
+ ReclaimedResource r = qsi.reclaimList.remove();
+ qsi.reclaimExpireList.add(r);
+ }
+ // is there any resource that needs to be expired?
+ if ((!qsi.reclaimExpireList.isEmpty()) &&
+ (qsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
+ ReclaimedResource r = qsi.reclaimExpireList.remove();
+ qsi.numReclaimedResources -= r.originalAmount;
+ }
+ // do we need to reclaim a resource later?
+ // if no queue is over capacity, there's nothing to reclaim
+ if (lastQsi.numRunningTasks <= lastQsi.guaranteedCapacity) {
+ continue;
+ }
+ if (qsi.numRunningTasks < qsi.guaranteedCapacity) {
+ // usedCap is how much capacity is currently accounted for
+ int usedCap = qsi.numRunningTasks + qsi.numReclaimedResources;
+ // see if we have remaining capacity and if we have enough pending
+ // tasks to use up remaining capacity
+ if ((usedCap < qsi.guaranteedCapacity) &&
+ ((qsi.numPendingTasks - qsi.numReclaimedResources)>0)) {
+ // create a request for resources to be reclaimed
+ int amt = Math.min((qsi.guaranteedCapacity-usedCap),
+ (qsi.numPendingTasks - qsi.numReclaimedResources));
+ // create a rsource object that needs to be reclaimed some time
+ // in the future
+ long whenToKill = qsi.reclaimTime -
+ (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING *
+ scheduler.taskTrackerManager.getNextHeartbeatInterval());
+ if (whenToKill < 0) whenToKill = 0;
+ qsi.reclaimList.add(new ReclaimedResource(amt,
+ currentTime + qsi.reclaimTime,
+ currentTime + whenToKill));
+ qsi.numReclaimedResources += amt;
+ LOG.debug("Queue " + qsi.queueName + " needs to reclaim " +
+ amt + " resources");
+ }
+ }
+ }
+ // kill tasks to reclaim capacity
+ if (0 != tasksToKill) {
+ killTasks(tasksToKill);
+ }
+ }
+
+ // kill 'tasksToKill' tasks
+ private void killTasks(int tasksToKill)
+ {
+ /*
+ * There are a number of fair ways in which one can figure out how
+ * many tasks to kill from which queue, so that the total number of
+ * tasks killed is equal to 'tasksToKill'.
+ * Maybe the best way is to keep a global ordering of running tasks
+ * and kill the ones that ran last, irrespective of what queue or
+ * job they belong to.
+ * What we do here is look at how many tasks is each queue running
+ * over capacity, and use that as a weight to decide how many tasks
+ * to kill from that queue.
+ */
+
+ // first, find out all queues over capacity
+ int loc;
+ for (loc=0; loc<qsiForAssigningTasks.size(); loc++) {
+ QueueSchedulingInfo qsi = qsiForAssigningTasks.get(loc);
+ if (qsi.numRunningTasks > qsi.guaranteedCapacity) {
+ // all queues from here onwards are running over cap
+ break;
+ }
+ }
+ // if some queue needs to reclaim cap, there must be at least one queue
+ // over cap. But check, just in case.
+ if (loc == qsiForAssigningTasks.size()) {
+ LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill +
+ " tasks but there is no queue over capacity.");
+ return;
+ }
+ // calculate how many total tasks are over cap
+ int tasksOverCap = 0;
+ for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
+ QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
+ tasksOverCap += (qsi.numRunningTasks - qsi.guaranteedCapacity);
+ }
+ // now kill tasks from each queue
+ for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
+ QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
+ killTasksFromQueue(qsi, (int)Math.round(
+ ((double)(qsi.numRunningTasks - qsi.guaranteedCapacity))*
+ tasksToKill/(double)tasksOverCap));
+ }
+ }
+
+ // kill 'tasksToKill' tasks from queue represented by qsi
+ private void killTasksFromQueue(QueueSchedulingInfo qsi, int tasksToKill) {
+ // we start killing as many tasks as possible from the jobs that started
+ // last. This way, we let long-running jobs complete faster.
+ int tasksKilled = 0;
+ JobInProgress jobs[] = scheduler.jobQueuesManager.
+ getRunningJobQueue(qsi.queueName).toArray(new JobInProgress[0]);
+ for (int i=jobs.length-1; i>=0; i--) {
+ if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) {
+ continue;
+ }
+ tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled);
+ if (tasksKilled >= tasksToKill) break;
+ }
+ }
+
+ // return the TaskAttemptID of the running task, if any, that has made
+ // the least progress.
+ TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) {
+ double leastProgress = 1;
+ TaskAttemptID tID = null;
+ for (Iterator<TaskAttemptID> it =
+ tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
+ TaskAttemptID taskid = it.next();
+ TaskStatus status = tip.getTaskStatus(taskid);
+ if (status.getRunState() == TaskStatus.State.RUNNING) {
+ if (status.getProgress() < leastProgress) {
+ leastProgress = status.getProgress();
+ tID = taskid;
+ }
+ }
+ }
+ return tID;
+ }
+
+
+ /**
+ * Update individual QSI objects.
+ * We don't need exact information for all variables, just enough for us
+ * to make scheduling decisions. For example, we don't need an exact count
+ * of numRunningTasks. Once we count upto the grid capacity (gcSum), any
+ * number beyond that will make no difference.
+ * */
+ private synchronized void updateQSIObjects() {
+ // if # of slots have changed since last time, update.
+ // First, compute whether the total number of TT slots have changed
+ int slotsDiff = getClusterCapacity()- numSlots;
+ numSlots += slotsDiff;
+ for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+ // compute new GCs and ACs, if TT slots have changed
+ if (slotsDiff != 0) {
+ qsi.guaranteedCapacity +=
+ (qsi.guaranteedCapacityPercent*slotsDiff/100);
+ }
+ qsi.numRunningTasks = 0;
+ qsi.numPendingTasks = 0;
+ for (String s: qsi.numRunningTasksByUser.keySet()) {
+ qsi.numRunningTasksByUser.put(s, 0);
+ }
+ // update stats on running jobs
+ for (JobInProgress j:
+ scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+ if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+ continue;
+ }
+ qsi.numRunningTasks += getRunningTasks(j);
+ Integer i = qsi.numRunningTasksByUser.get(j.getProfile().getUser());
+ qsi.numRunningTasksByUser.put(j.getProfile().getUser(),
+ i+getRunningTasks(j));
+ qsi.numPendingTasks += getPendingTasks(j);
+ LOG.debug("updateQSI: job " + j.toString() + ": run(m) = " +
+ j.runningMaps() + ", run(r) = " + j.runningReduces() +
+ ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
+ j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
+ ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
+ j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
+ + ", total(m) = " + j.numMapTasks + ", total(r) = " +
+ j.numReduceTasks);
+ /*
+ * it's fine walking down the entire list of running jobs - there
+ * probably will not be many, plus, we may need to go through the
+ * list to compute numRunningTasksByUser. If this is expensive, we
+ * can keep a list of running jobs per user. Then we only need to
+ * consider the first few jobs per user.
+ */
+ }
+ // update stats on waiting jobs
+ for (JobInProgress j:
+ scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+ // pending tasks
+ if (qsi.numPendingTasks > getClusterCapacity()) {
+ // that's plenty. no need for more computation
+ break;
+ }
+ qsi.numPendingTasks += getPendingTasks(j);
+ }
+ }
+ }
+
+
+ void jobAdded(JobInProgress job) {
+ // update qsi
+ QueueSchedulingInfo qsi =
+ queueInfoMap.get(job.getProfile().getQueueName());
+ // qsi shouldn't be null
+
+ // update user-specific info
+ Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
+ if (null == i) {
+ qsi.numJobsByUser.put(job.getProfile().getUser(), 1);
+ qsi.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
+ }
+ else {
+ i++;
+ }
+ }
+ void jobRemoved(JobInProgress job) {
+ // update qsi
+ QueueSchedulingInfo qsi =
+ queueInfoMap.get(job.getProfile().getQueueName());
+ // qsi shouldn't be null
+
+ // update numJobsByUser
+ LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
+ Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
+ i--;
+ if (0 == i.intValue()) {
+ qsi.numJobsByUser.remove(job.getProfile().getUser());
+ qsi.numRunningTasksByUser.remove(job.getProfile().getUser());
+ LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
+ }
+ else {
+ LOG.debug("User still has jobs, number of users = " + qsi.numJobsByUser.size());
+ }
+ }
+
+ // called when a task is allocated to queue represented by qsi.
+ // update our info about reclaimed resources
+ private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
+ // if we needed to reclaim resources, we have reclaimed one
+ if (qsi.reclaimList.isEmpty()) {
+ return;
+ }
+ ReclaimedResource res = qsi.reclaimList.getFirst();
+ res.currentAmount--;
+ if (0 == res.currentAmount) {
+ // move this resource to the expiry list
+ ReclaimedResource r = qsi.reclaimList.remove();
+ qsi.reclaimExpireList.add(r);
+ }
+ }
+
+ private synchronized void updateCollectionOfQSIs() {
+ Collections.sort(qsiForAssigningTasks, queueComparator);
+ }
+
+
+ private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+ // what is our current capacity? It's GC if we're running below GC.
+ // If we're running over GC, then its #running plus 1 (which is the
+ // extra slot we're getting).
+ int currentCapacity;
+ if (qsi.numRunningTasks < qsi.guaranteedCapacity) {
+ currentCapacity = qsi.guaranteedCapacity;
+ }
+ else {
+ currentCapacity = qsi.numRunningTasks+1;
+ }
+ int limit = Math.max((int)(Math.ceil((double)currentCapacity/
+ (double)qsi.numJobsByUser.size())),
+ (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
+ if (qsi.numRunningTasksByUser.get(
+ j.getProfile().getUser()) >= limit) {
+ LOG.debug("User " + j.getProfile().getUser() +
+ " is over limit, num running tasks = " +
+ qsi.numRunningTasksByUser.get(j.getProfile().getUser()) +
+ ", limit = " + limit);
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ private Task getTaskFromQueue(TaskTrackerStatus taskTracker,
+ QueueSchedulingInfo qsi) throws IOException {
+ Task t = null;
+ // keep track of users over limit
+ Set<String> usersOverLimit = new HashSet<String>();
+ // look at running jobs first
+ for (JobInProgress j:
+ scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+ // some jobs may be in the running queue but may have completed
+ // and not yet have been removed from the running queue
+ if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+ continue;
+ }
+ // is this job's user over limit?
+ if (isUserOverLimit(j, qsi)) {
+ // user over limit.
+ usersOverLimit.add(j.getProfile().getUser());
+ continue;
+ }
+ // We found a suitable job. Get task from it.
+ t = obtainNewTask(taskTracker, j);
+ if (t != null) {
+ LOG.debug("Got task from job " +
+ j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+ return t;
+ }
+ }
+
+ // if we're here, we found nothing in the running jobs. Time to
+ // look at waiting jobs. Get first job of a user that is not over limit
+ for (JobInProgress j:
+ scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+ // is this job's user over limit?
+ if (usersOverLimit.contains(j.getProfile().getUser())) {
+ // user over limit.
+ continue;
+ }
+ // this job is a candidate for running. Initialize it, move it
+ // to run queue
+ j.initTasks();
+ scheduler.jobQueuesManager.jobUpdated(j);
+ // We found a suitable job. Get task from it.
+ t = obtainNewTask(taskTracker, j);
+ if (t != null) {
+ LOG.debug("Getting task from job " +
+ j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+ return t;
+ }
+ }
+
+ // if we're here, we haven't found anything. This could be because
+ // there is nothing to run, or that the user limit for some user is
+ // too strict, i.e., there's at least one user who doesn't have
+ // enough tasks to satisfy his limit. If it's the later case, look at
+ // jobs without considering user limits, and get task from first
+ // eligible job
+ if (usersOverLimit.size() > 0) {
+ for (JobInProgress j:
+ scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+ if ((j.getStatus().getRunState() == JobStatus.RUNNING) &&
+ (usersOverLimit.contains(j.getProfile().getUser()))) {
+ t = obtainNewTask(taskTracker, j);
+ if (t != null) {
+ LOG.debug("Getting task from job " +
+ j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+ return t;
+ }
+ }
+ }
+ // look at waiting jobs the same way
+ for (JobInProgress j:
+ scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
+ if (usersOverLimit.contains(j.getProfile().getUser())) {
+ j.initTasks();
+ scheduler.jobQueuesManager.jobUpdated(j);
+ t = obtainNewTask(taskTracker, j);
+ if (t != null) {
+ LOG.debug("Getting task from job " +
+ j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName);
+ return t;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException {
+ Task t = null;
+
+ /*
+ * update all our QSI objects.
+ * This involves updating each qsi structure. This operation depends
+ * on the number of running jobs in a queue, and some waiting jobs. If it
+ * becomes expensive, do it once every few hearbeats only.
+ */
+ updateQSIObjects();
+ LOG.debug("After updating QSI objects:");
+ printQSIs();
+ /*
+ * sort list of qeues first, as we want queues that need the most to
+ * get first access. If this is expensive, sort every few heartbeats.
+ * We're only sorting a collection of queues - there shouldn't be many.
+ */
+ updateCollectionOfQSIs();
+ for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+ t = getTaskFromQueue(taskTracker, qsi);
+ if (t!= null) {
+ // we have a task. Update reclaimed resource info
+ updateReclaimedResources(qsi);
+ return Collections.singletonList(t);
+ }
+ }
+
+ // nothing to give
+ return null;
+ }
+
+ private void printQSIs() {
+ StringBuffer s = new StringBuffer();
+ for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+ Collection<JobInProgress> runJobs =
+ scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+ Collection<JobInProgress> waitJobs =
+ scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName);
+ s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" +
+ qsi.numRunningTasks + ", gc=" + qsi.guaranteedCapacity +
+ ", wait=" + qsi.numPendingTasks + ", run jobs="+ runJobs.size() +
+ ", wait jobs=" + waitJobs.size() + "*** ");
+ }
+ LOG.debug(s);
+ }
+
+ }
+
+ /**
+ * The scheduling algorithms for map tasks.
+ */
+ private static class MapSchedulingMgr extends TaskSchedulingMgr {
+ MapSchedulingMgr(CapacityTaskScheduler dad) {
+ super(dad);
+ type = new String("map");
+ }
+ Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
+ throws IOException {
+ ClusterStatus clusterStatus =
+ scheduler.taskTrackerManager.getClusterStatus();
+ int numTaskTrackers = clusterStatus.getTaskTrackers();
+ return job.obtainNewMapTask(taskTracker, numTaskTrackers,
+ scheduler.taskTrackerManager.getNumberOfUniqueHosts());
+ }
+ int getClusterCapacity() {
+ return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks();
+ }
+ int getRunningTasks(JobInProgress job) {
+ return job.runningMaps();
+ }
+ int getPendingTasks(JobInProgress job) {
+ return job.pendingMaps();
+ }
+ int killTasksFromJob(JobInProgress job, int tasksToKill) {
+ /*
+ * We'd like to kill tasks that ran the last, or that have made the
+ * least progress.
+ * Ideally, each job would have a list of tasks, sorted by start
+ * time or progress. That's a lot of state to keep, however.
+ * For now, we do something a little different. We first try and kill
+ * non-local tasks, as these can be run anywhere. For each TIP, we
+ * kill the task that has made the least progress, if the TIP has
+ * more than one active task.
+ * We then look at tasks in runningMapCache.
+ */
+ int tasksKilled = 0;
+
+ /*
+ * For non-local running maps, we 'cheat' a bit. We know that the set
+ * of non-local running maps has an insertion order such that tasks
+ * that ran last are at the end. So we iterate through the set in
+ * reverse. This is OK because even if the implementation changes,
+ * we're still using generic set iteration and are no worse of.
+ */
+ TaskInProgress[] tips =
+ job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]);
+ for (int i=tips.length-1; i>=0; i--) {
+ // pick the tast attempt that has progressed least
+ TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
+ if (null != tid) {
+ if (tips[i].killTask(tid, false)) {
+ if (++tasksKilled >= tasksToKill) {
+ return tasksKilled;
+ }
+ }
+ }
+ }
+ // now look at other running tasks
+ for (Set<TaskInProgress> s: job.getRunningMapCache().values()) {
+ for (TaskInProgress tip: s) {
+ TaskAttemptID tid = getRunningTaskWithLeastProgress(tip);
+ if (null != tid) {
+ if (tip.killTask(tid, false)) {
+ if (++tasksKilled >= tasksToKill) {
+ return tasksKilled;
+ }
+ }
+ }
+ }
+ }
+ return tasksKilled;
+ }
+
+ }
+
+ /**
+ * The scheduling algorithms for reduce tasks.
+ */
+ private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
+ ReduceSchedulingMgr(CapacityTaskScheduler dad) {
+ super(dad);
+ type = new String("reduce");
+ }
+ Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
+ throws IOException {
+ ClusterStatus clusterStatus =
+ scheduler.taskTrackerManager.getClusterStatus();
+ int numTaskTrackers = clusterStatus.getTaskTrackers();
+ return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+ scheduler.taskTrackerManager.getNumberOfUniqueHosts());
+ }
+ int getClusterCapacity() {
+ return scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+ }
+ int getRunningTasks(JobInProgress job) {
+ return job.runningReduces();
+ }
+ int getPendingTasks(JobInProgress job) {
+ return job.pendingReduces();
+ }
+ int killTasksFromJob(JobInProgress job, int tasksToKill) {
+ /*
+ * For reduces, we 'cheat' a bit. We know that the set
+ * of running reduces has an insertion order such that tasks
+ * that ran last are at the end. So we iterate through the set in
+ * reverse. This is OK because even if the implementation changes,
+ * we're still using generic set iteration and are no worse of.
+ */
+ int tasksKilled = 0;
+ TaskInProgress[] tips =
+ job.getRunningReduces().toArray(new TaskInProgress[0]);
+ for (int i=tips.length-1; i>=0; i--) {
+ // pick the tast attempt that has progressed least
+ TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
+ if (null != tid) {
+ if (tips[i].killTask(tid, false)) {
+ if (++tasksKilled >= tasksToKill) {
+ return tasksKilled;
+ }
+ }
+ }
+ }
+ return tasksKilled;
+ }
+ }
+
+ /** the scheduling mgrs for Map and Reduce tasks */
+ protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
+ protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
+
+ /** name of the default queue. */
+ static final String DEFAULT_QUEUE_NAME = "default";
+
+ /** how often does redistribution thread run (in msecs)*/
+ private static long RECLAIM_CAPACITY_INTERVAL;
+ /** we start killing tasks to reclaim capacity when we have so many
+ * heartbeats left. */
+ private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
+
+ private static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
+ protected JobQueuesManager jobQueuesManager;
+ protected CapacitySchedulerConf rmConf;
+ /** whether scheduler has started or not */
+ private boolean started = false;
+
+ /**
+ * Used to distribute/reclaim excess capacity among queues
+ */
+ class ReclaimCapacity implements Runnable {
+ public ReclaimCapacity() {
+ }
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(RECLAIM_CAPACITY_INTERVAL);
+ if (stopReclaim) {
+ break;
+ }
+ reclaimCapacity();
+ } catch (InterruptedException t) {
+ break;
+ } catch (Throwable t) {
+ LOG.error("Error in redistributing capacity:\n" +
+ StringUtils.stringifyException(t));
+ }
+ }
+ }
+ }
+ private Thread reclaimCapacityThread = null;
+ /** variable to indicate that thread should stop */
+ private boolean stopReclaim = false;
+
+ /**
+ * A clock class - can be mocked out for testing.
+ */
+ static class Clock {
+ long getTime() {
+ return System.currentTimeMillis();
+ }
+ }
+ private Clock clock;
+
+
+ public CapacityTaskScheduler() {
+ this(new Clock());
+ }
+
+ // for testing
+ public CapacityTaskScheduler(Clock clock) {
+ this.jobQueuesManager = new JobQueuesManager(this);
+ this.clock = clock;
+ }
+
+ /** mostly for testing purposes */
+ public void setResourceManagerConf(CapacitySchedulerConf conf) {
+ this.rmConf = conf;
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ if (started) return;
+ super.start();
+ RECLAIM_CAPACITY_INTERVAL =
+ conf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
+ RECLAIM_CAPACITY_INTERVAL *= 1000;
+ // initialize our queues from the config settings
+ if (null == rmConf) {
+ rmConf = new CapacitySchedulerConf();
+ }
+ // read queue info from config file
+ Set<String> queues = taskTrackerManager.getQueueManager().getQueues();
+ float totalCapacity = 0.0f;
+ for (String queueName: queues) {
+ float gc = rmConf.getGuaranteedCapacity(queueName);
+ totalCapacity += gc;
+ int ulMin = rmConf.getMinimumUserLimitPercent(queueName);
+ long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName);
+ // reclaimTimeLimit is the time(in millisec) within which we need to
+ // reclaim capacity.
+ // create queue scheduling objects for Map and Reduce
+ mapScheduler.add(new QueueSchedulingInfo(queueName, gc,
+ ulMin, reclaimTimeLimit));
+ reduceScheduler.add(new QueueSchedulingInfo(queueName, gc,
+ ulMin, reclaimTimeLimit));
+
+ // create the queues of job objects
+ boolean supportsPrio = rmConf.isPrioritySupported(queueName);
+ jobQueuesManager.createQueue(queueName, supportsPrio);
+ }
+ if (totalCapacity > 100.0) {
+ throw new IllegalArgumentException("Sum of queue capacities over 100% at "
+ + totalCapacity);
+ }
+
+ // Sanity check: there should be at least one queue.
+ if (0 == mapScheduler.getNumQueues()) {
+ throw new IllegalStateException("System has no queue configured");
+ }
+
+ // check if there's a queue with the default name. If not, we quit.
+ if (!mapScheduler.isQueuePresent(DEFAULT_QUEUE_NAME)) {
+ throw new IllegalStateException("System has no default queue configured");
+ }
+
+ // listen to job changes
+ taskTrackerManager.addJobInProgressListener(jobQueuesManager);
+
+ // start thread for redistributing capacity
+ this.reclaimCapacityThread =
+ new Thread(new ReclaimCapacity(),"reclaimCapacity");
+ this.reclaimCapacityThread.start();
+ started = true;
+ LOG.info("Capacity scheduler initialized " + queues.size() + " queues");
+ }
+
+ @Override
+ public synchronized void terminate() throws IOException {
+ if (!started) return;
+ if (jobQueuesManager != null) {
+ taskTrackerManager.removeJobInProgressListener(
+ jobQueuesManager);
+ }
+ // tell the reclaim thread to stop
+ stopReclaim = true;
+ started = false;
+ super.terminate();
+ }
+
+ @Override
+ public synchronized void setConf(Configuration conf) {
+ super.setConf(conf);
+ }
+
+ void reclaimCapacity() {
+ mapScheduler.reclaimCapacity();
+ reduceScheduler.reclaimCapacity();
+ }
+
+ /**
+ * provided for the test classes
+ * lets you update the QSI objects and sorted collection
+ */
+ void updateQSIInfo() {
+ mapScheduler.updateQSIObjects();
+ mapScheduler.updateCollectionOfQSIs();
+ reduceScheduler.updateQSIObjects();
+ reduceScheduler.updateCollectionOfQSIs();
+ }
+
+ /*
+ * The grand plan for assigning a task.
+ * First, decide whether a Map or Reduce task should be given to a TT
+ * (if the TT can accept either).
+ * Next, pick a queue. We only look at queues that need a slot. Among
+ * these, we first look at queues whose ac is less than gc (queues that
+ * gave up capacity in the past). Next, we look at any other queue that
+ * needs a slot.
+ * Next, pick a job in a queue. we pick the job at the front of the queue
+ * unless its user is over the user limit.
+ * Finally, given a job, pick a task from the job.
+ *
+ */
+ @Override
+ public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+ throws IOException {
+
+ List<Task> tasks = null;
+ /*
+ * If TT has Map and Reduce slot free, we need to figure out whether to
+ * give it a Map or Reduce task.
+ * Number of ways to do this. For now, base decision on how much is needed
+ * versus how much is used (default to Map, if equal).
+ */
+ LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() +
+ ", run maps=" + taskTracker.countMapTasks() + ", max reds=" +
+ taskTracker.getMaxReduceTasks() + ", run reds=" +
+ taskTracker.countReduceTasks() + ", map cap=" +
+ mapScheduler.getClusterCapacity() + ", red cap = " +
+ reduceScheduler.getClusterCapacity());
+ int maxMapTasks = taskTracker.getMaxMapTasks();
+ int currentMapTasks = taskTracker.countMapTasks();
+ int maxReduceTasks = taskTracker.getMaxReduceTasks();
+ int currentReduceTasks = taskTracker.countReduceTasks();
+ if ((maxReduceTasks - currentReduceTasks) >
+ (maxMapTasks - currentMapTasks)) {
+ tasks = reduceScheduler.assignTasks(taskTracker);
+ // if we didn't get any, look at map tasks, if TT has space
+ if ((null == tasks) && (maxMapTasks > currentMapTasks)) {
+ tasks = mapScheduler.assignTasks(taskTracker);
+ }
+ }
+ else {
+ tasks = mapScheduler.assignTasks(taskTracker);
+ // if we didn't get any, look at red tasks, if TT has space
+ if ((null == tasks) && (maxReduceTasks > currentReduceTasks)) {
+ tasks = reduceScheduler.assignTasks(taskTracker);
+ }
+ }
+ return tasks;
+ }
+
+ // called when a job is added
+ synchronized void jobAdded(JobInProgress job) {
+ // let our map and reduce schedulers know this, so they can update
+ // user-specific info
+ mapScheduler.jobAdded(job);
+ reduceScheduler.jobAdded(job);
+ }
+
+ // called when a job is removed
+ synchronized void jobRemoved(JobInProgress job) {
+ // let our map and reduce schedulers know this, so they can update
+ // user-specific info
+ mapScheduler.jobRemoved(job);
+ reduceScheduler.jobRemoved(job);
+ }
+
+}
+
Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Thu Sep 11 11:49:22 2008
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link JobInProgressListener} that maintains the jobs being managed in
+ * one or more queues.
+ */
+class JobQueuesManager extends JobInProgressListener {
+
+ /*
+ * If a queue supports priorities, waiting jobs must be
+ * sorted on priorities, and then on their start times (technically,
+ * their insertion time.
+ * If a queue doesn't support priorities, waiting jobs are
+ * sorted based on their start time.
+ * Running jobs are not sorted. A job that started running earlier
+ * is ahead in the queue, so insertion should be at the tail.
+ */
+
+ // comparator for jobs in queues that support priorities
+ private static final Comparator<JobInProgress> PRIORITY_JOB_COMPARATOR
+ = new Comparator<JobInProgress>() {
+ public int compare(JobInProgress o1, JobInProgress o2) {
+ // Look at priority.
+ int res = o1.getPriority().compareTo(o2.getPriority());
+ if (res == 0) {
+ // the job that started earlier wins
+ if (o1.getStartTime() < o2.getStartTime()) {
+ res = -1;
+ } else {
+ res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
+ }
+ }
+ if (res == 0) {
+ res = o1.getJobID().compareTo(o2.getJobID());
+ }
+ return res;
+ }
+ };
+ // comparator for jobs in queues that don't support priorities
+ private static final Comparator<JobInProgress> STARTTIME_JOB_COMPARATOR
+ = new Comparator<JobInProgress>() {
+ public int compare(JobInProgress o1, JobInProgress o2) {
+ // the job that started earlier wins
+ if (o1.getStartTime() < o2.getStartTime()) {
+ return -1;
+ } else {
+ return (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
+ }
+ }
+ };
+
+ // class to store queue info
+ private static class QueueInfo {
+
+ // whether the queue supports priorities
+ boolean supportsPriorities;
+ // maintain separate collections of running & waiting jobs. This we do
+ // mainly because when a new job is added, it cannot superceede a running
+ // job, even though the latter may be a lower priority. If this is ever
+ // changed, we may get by with one collection.
+ Collection<JobInProgress> waitingJobs;
+ Collection<JobInProgress> runningJobs;
+
+ QueueInfo(boolean prio) {
+ this.supportsPriorities = prio;
+ if (supportsPriorities) {
+ this.waitingJobs = new TreeSet<JobInProgress>(PRIORITY_JOB_COMPARATOR);
+ }
+ else {
+ this.waitingJobs = new TreeSet<JobInProgress>(STARTTIME_JOB_COMPARATOR);
+ }
+ this.runningJobs = new LinkedList<JobInProgress>();
+ }
+
+ /**
+ * we need to delete an object from our TreeSet based on referential
+ * equality, rather than value equality that the TreeSet uses.
+ * Another way to do this is to extend the TreeSet and override remove().
+ */
+ static private boolean removeOb(Collection<JobInProgress> c, Object o) {
+ Iterator<JobInProgress> i = c.iterator();
+ while (i.hasNext()) {
+ if (i.next() == o) {
+ i.remove();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+
+ // we maintain a hashmap of queue-names to queue info
+ private Map<String, QueueInfo> jobQueues =
+ new HashMap<String, QueueInfo>();
+ private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
+ private CapacityTaskScheduler scheduler;
+
+
+ JobQueuesManager(CapacityTaskScheduler s) {
+ this.scheduler = s;
+ }
+
+ /**
+ * create an empty queue with the default comparator
+ * @param queueName The name of the queue
+ * @param supportsPriotities whether the queue supports priorities
+ */
+ public void createQueue(String queueName, boolean supportsPriotities) {
+ jobQueues.put(queueName, new QueueInfo(supportsPriotities));
+ }
+
+ /**
+ * Returns the queue of running jobs associated with the name
+ */
+ public Collection<JobInProgress> getRunningJobQueue(String queueName) {
+ return jobQueues.get(queueName).runningJobs;
+ }
+
+ /**
+ * Returns the queue of waiting jobs associated with the name
+ */
+ public Collection<JobInProgress> getWaitingJobQueue(String queueName) {
+ return jobQueues.get(queueName).waitingJobs;
+ }
+
+ @Override
+ public void jobAdded(JobInProgress job) {
+ LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
+ // add job to the right queue
+ QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+ if (null == qi) {
+ // job was submitted to a queue we're not aware of
+ LOG.warn("Invalid queue " + job.getProfile().getQueueName() +
+ " specified for job" + job.getProfile().getJobID() +
+ ". Ignoring job.");
+ return;
+ }
+ // add job to waiting queue. It will end up in the right place,
+ // based on priority.
+ // We use our own version of removing objects based on referential
+ // equality, since the 'job' object has already been changed.
+ qi.waitingJobs.add(job);
+ // let scheduler know.
+ scheduler.jobAdded(job);
+ }
+
+ @Override
+ public void jobRemoved(JobInProgress job) {
+ QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+ if (null == qi) {
+ // can't find queue for job. Shouldn't happen.
+ LOG.warn("Could not find queue " + job.getProfile().getQueueName() +
+ " when removing job " + job.getProfile().getJobID());
+ return;
+ }
+ // job could be in running or waiting queue
+ if (!qi.runningJobs.remove(job)) {
+ QueueInfo.removeOb(qi.waitingJobs, job);
+ }
+ // let scheduler know
+ scheduler.jobRemoved(job);
+ }
+
+ @Override
+ public void jobUpdated(JobInProgress job) {
+ QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+ if (null == qi) {
+ // can't find queue for job. Shouldn't happen.
+ LOG.warn("Could not find queue " + job.getProfile().getQueueName() +
+ " when updating job " + job.getProfile().getJobID());
+ return;
+ }
+ // this is called when a job's priority or state is changed.
+ // since we don't know the job's previous state, we need to
+ // find out in which queue it was earlier, and then place it in the
+ // right queue.
+ Collection<JobInProgress> dest = (job.getStatus().getRunState() ==
+ JobStatus.PREP)? qi.waitingJobs: qi.runningJobs;
+ // We use our own version of removing objects based on referential
+ // equality, since the 'job' object has already been changed.
+ if (!QueueInfo.removeOb(qi.waitingJobs, job)) {
+ qi.runningJobs.remove(job);
+ }
+ dest.add(job);
+ }
+
+}