You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [4/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Sat Nov 28 20:26:01 2009
@@ -20,21 +20,19 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.AbstractQueue.AbstractQueueComparator;
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
-import org.apache.hadoop.mapred.TaskTrackerStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.util.StringUtils;
/**
* A {@link TaskScheduler} that implements the requirements in HADOOP-3421
@@ -57,218 +55,18 @@
*
*/
class CapacityTaskScheduler extends TaskScheduler {
-
- /***********************************************************************
- * Keeping track of scheduling information for queues
- *
- * We need to maintain scheduling information relevant to a queue (its
- * name, capacity, etc), along with information specific to
- * each kind of task, Map or Reduce (num of running tasks, pending
- * tasks etc).
- *
- * This scheduling information is used to decide how to allocate
- * tasks, redistribute capacity, etc.
- *
- * A QueueSchedulingInfo(QSI) object represents scheduling information for
- * a queue. A TaskSchedulingInfo (TSI) object represents scheduling
- * information for a particular kind of task (Map or Reduce).
- *
- **********************************************************************/
-
- private static class TaskSchedulingInfo {
-
- private static final String LIMIT_NORMALIZED_CAPACITY_STRING
- = "(Capacity is restricted to max limit of %d slots.\n" +
- "Remaining %d slots will be used by other queues.)\n";
- /**
- * the actual capacity, which depends on how many slots are available
- * in the cluster at any given time.
- */
- private int capacity = 0;
- // number of running tasks
- int numRunningTasks = 0;
- // number of slots occupied by running tasks
- int numSlotsOccupied = 0;
-
- /**
- * max task limit
- * This value is the maximum slots that can be used in a
- * queue at any point of time. So for example assuming above config value
- * is 100 , not more than 100 tasks would be in the queue at any point of
- * time, assuming each task takes one slot.
- */
- private int maxTaskLimit = -1;
-
- /**
- * for each user, we need to keep track of number of slots occupied by
- * running tasks
- */
- Map<String, Integer> numSlotsOccupiedByUser =
- new HashMap<String, Integer>();
-
- /**
- * reset the variables associated with tasks
- */
- void resetTaskVars() {
- numRunningTasks = 0;
- numSlotsOccupied = 0;
- for (String s: numSlotsOccupiedByUser.keySet()) {
- numSlotsOccupiedByUser.put(s, Integer.valueOf(0));
- }
- }
-
-
- int getMaxTaskLimit() {
- return maxTaskLimit;
- }
-
- void setMaxTaskLimit(int maxTaskCap) {
- this.maxTaskLimit = maxTaskCap;
- }
-
- /**
- * This method checks for maxTaskLimit and sends minimum of maxTaskLimit and
- * capacity.
- * @return
- */
- int getCapacity() {
- return ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) ? maxTaskLimit :
- capacity;
- }
-
- /**
- * Mutator method for capacity
- * @param capacity
- */
- void setCapacity(int capacity) {
- this.capacity = capacity;
- }
+ /** quick way to get qsc object given a queue name */
+ private Map<String, QueueSchedulingContext> queueInfoMap =
+ new HashMap<String, QueueSchedulingContext>();
+
+ //Root level queue . It has all the
+ //cluster capacity at its disposal.
+ //Queues declared by users would
+ //be children of this queue.
+ //CS would have handle to root.
+ private AbstractQueue root = null;
- /**
- * return information about the tasks
- */
- @Override
- public String toString() {
- float occupiedSlotsAsPercent =
- getCapacity() != 0 ?
- ((float) numSlotsOccupied * 100 / getCapacity()) : 0;
- StringBuffer sb = new StringBuffer();
-
- sb.append("Capacity: " + capacity + " slots\n");
- //If maxTaskLimit is less than the capacity
- if (maxTaskLimit >= 0 && maxTaskLimit < capacity) {
- sb.append(String.format(LIMIT_NORMALIZED_CAPACITY_STRING,
- maxTaskLimit, (capacity-maxTaskLimit)));
- }
- if (maxTaskLimit >= 0) {
- sb.append(String.format("Maximum Slots Limit: %d\n", maxTaskLimit));
- }
- sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
- Integer.valueOf(numSlotsOccupied), Float
- .valueOf(occupiedSlotsAsPercent)));
- sb.append(String.format("Running tasks: %d\n", Integer
- .valueOf(numRunningTasks)));
- // include info on active users
- if (numSlotsOccupied != 0) {
- sb.append("Active users:\n");
- for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
- .entrySet()) {
- if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
- // user has no tasks running
- continue;
- }
- sb.append("User '" + entry.getKey() + "': ");
- int numSlotsOccupiedByThisUser = entry.getValue().intValue();
- float p =
- (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
- sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
- .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
- }
- }
- return sb.toString();
- }
- }
-
- private static class QueueSchedulingInfo {
- String queueName;
-
- /** capacity(%) is set in the config */
- float capacityPercent = 0;
-
- /**
- * to handle user limits, we need to know how many users have jobs in
- * the queue.
- */
- Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
-
- /** min value of user limit (same for all users) */
- int ulMin;
-
- /**
- * We keep track of the JobQueuesManager only for reporting purposes
- * (in toString()).
- */
- private JobQueuesManager jobQueuesManager;
-
- /**
- * We keep a TaskSchedulingInfo object for each kind of task we support
- */
- TaskSchedulingInfo mapTSI;
- TaskSchedulingInfo reduceTSI;
-
- public QueueSchedulingInfo(String queueName, float capacityPercent,
- int ulMin, JobQueuesManager jobQueuesManager,
- int mapCap, int reduceCap) {
- this.queueName = new String(queueName);
- this.capacityPercent = capacityPercent;
- this.ulMin = ulMin;
- this.jobQueuesManager = jobQueuesManager;
- this.mapTSI = new TaskSchedulingInfo();
- this.reduceTSI = new TaskSchedulingInfo();
- this.mapTSI.setMaxTaskLimit(mapCap);
- this.reduceTSI.setMaxTaskLimit(reduceCap);
- }
-
- /**
- * return information about the queue
- * @return a String representing the information about the queue.
- */
- @Override
- public String toString(){
- // We print out the queue information first, followed by info
- // on map and reduce tasks and job info
- StringBuffer sb = new StringBuffer();
- sb.append("Queue configuration\n");
- sb.append("Capacity Percentage: ");
- sb.append(capacityPercent);
- sb.append("%\n");
- sb.append(String.format("User Limit: %d%s\n",ulMin, "%"));
- sb.append(String.format("Priority Supported: %s\n",
- (jobQueuesManager.doesQueueSupportPriorities(queueName))?
- "YES":"NO"));
- sb.append("-------------\n");
-
- sb.append("Map tasks\n");
- sb.append(mapTSI.toString());
- sb.append("-------------\n");
- sb.append("Reduce tasks\n");
- sb.append(reduceTSI.toString());
- sb.append("-------------\n");
-
- sb.append("Job info\n");
- sb.append(String.format("Number of Waiting Jobs: %d\n",
- jobQueuesManager.getWaitingJobCount(queueName)));
- sb.append(String.format("Number of users who have submitted jobs: %d\n",
- numJobsByUser.size()));
- return sb.toString();
- }
- }
-
- /** quick way to get qsi object given a queue name */
- private Map<String, QueueSchedulingInfo> queueInfoMap =
- new HashMap<String, QueueSchedulingInfo>();
-
/**
* This class captures scheduling information we want to display or log.
*/
@@ -283,16 +81,17 @@
@Override
public String toString(){
- // note that we do not call updateQSIObjects() here for performance
+ // note that we do not call updateContextObjects() here for performance
// reasons. This means that the data we print out may be slightly
// stale. This data is updated whenever assignTasks() is called
// If this doesn't happen, the data gets stale. If we see
// this often, we may need to detect this situation and call
- // updateQSIObjects(), or just call it each time.
+ // updateContextObjects(), or just call it each time.
return scheduler.getDisplayInfo(queueName);
}
}
+
// this class encapsulates the result of a task lookup
private static class TaskLookupResult {
@@ -318,6 +117,7 @@
}
static TaskLookupResult getTaskFoundResult(Task t) {
+ LOG.debug("Returning task " + t);
return new TaskLookupResult(t, LookUpStatus.TASK_FOUND);
}
static TaskLookupResult getNoTaskFoundResult() {
@@ -337,11 +137,11 @@
}
}
- /**
- * This class handles the scheduling algorithms.
- * The algos are the same for both Map and Reduce tasks.
+ /**
+ * This class handles the scheduling algorithms.
+ * The algos are the same for both Map and Reduce tasks.
* There may be slight variations later, in which case we can make this
- * an abstract base class and have derived classes for Map and Reduce.
+ * an abstract base class and have derived classes for Map and Reduce.
*/
private static abstract class TaskSchedulingMgr {
@@ -349,69 +149,40 @@
protected CapacityTaskScheduler scheduler;
protected TaskType type = null;
- abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
+ abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
JobInProgress job) throws IOException;
- int getSlotsOccupied(JobInProgress job) {
- return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) *
- getSlotsPerTask(job);
- }
-
abstract int getClusterCapacity();
- abstract int getSlotsPerTask(JobInProgress job);
- abstract int getRunningTasks(JobInProgress job);
- abstract int getPendingTasks(JobInProgress job);
- abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
- abstract int getNumReservedTaskTrackers(JobInProgress job);
-
+ abstract TaskSchedulingContext getTSC(
+ QueueSchedulingContext qsc);
/**
* To check if job has a speculative task on the particular tracker.
- *
+ *
* @param job job to check for speculative tasks.
* @param tts task tracker on which speculative task would run.
* @return true if there is a speculative task to run on the tracker.
*/
- abstract boolean hasSpeculativeTask(JobInProgress job,
+ abstract boolean hasSpeculativeTask(JobInProgress job,
TaskTrackerStatus tts);
/**
- * Check if the given job has sufficient reserved tasktrackers for all its
- * pending tasks.
- *
- * @param job job to check for sufficient reserved tasktrackers
- * @return <code>true</code> if the job has reserved tasktrackers,
- * else <code>false</code>
- */
- boolean hasSufficientReservedTaskTrackers(JobInProgress job) {
- return getNumReservedTaskTrackers(job) >= getPendingTasks(job);
- }
-
- /**
- * List of QSIs for assigning tasks.
- * Queues are ordered by a ratio of (# of running tasks)/capacity, which
- * indicates how much 'free space' the queue has, or how much it is over
- * capacity. This ordered list is iterated over, when assigning tasks.
- */
- private List<QueueSchedulingInfo> qsiForAssigningTasks =
- new ArrayList<QueueSchedulingInfo>();
-
- /**
* Comparator to sort queues.
- * For maps, we need to sort on QueueSchedulingInfo.mapTSI. For
- * reducers, we use reduceTSI. So we'll need separate comparators.
- */
- private static abstract class QueueComparator
- implements Comparator<QueueSchedulingInfo> {
- abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
- public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
- TaskSchedulingInfo t1 = getTSI(q1);
- TaskSchedulingInfo t2 = getTSI(q2);
+ * For maps, we need to sort on QueueSchedulingContext.mapTSC. For
+ * reducers, we use reduceTSC. So we'll need separate comparators.
+ */
+ private static abstract class QueueComparator
+ implements Comparator<AbstractQueue> {
+ abstract TaskSchedulingContext getTSC(
+ QueueSchedulingContext qsi);
+ public int compare(AbstractQueue q1, AbstractQueue q2) {
+ TaskSchedulingContext t1 = getTSC(q1.getQueueSchedulingContext());
+ TaskSchedulingContext t2 = getTSC(q2.getQueueSchedulingContext());
// look at how much capacity they've filled. Treat a queue with
// capacity=0 equivalent to a queue running at capacity
double r1 = (0 == t1.getCapacity())? 1.0f:
- (double)t1.numSlotsOccupied/(double) t1.getCapacity();
+ (double) t1.getNumSlotsOccupied() /(double) t1.getCapacity();
double r2 = (0 == t2.getCapacity())? 1.0f:
- (double)t2.numSlotsOccupied/(double) t2.getCapacity();
+ (double) t2.getNumSlotsOccupied() /(double) t2.getCapacity();
if (r1<r2) return -1;
else if (r1>r2) return 1;
else return 0;
@@ -419,67 +190,78 @@
}
// subclass for map and reduce comparators
private static final class MapQueueComparator extends QueueComparator {
- TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
- return qsi.mapTSI;
+ TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+ return qsi.getMapTSC();
}
}
private static final class ReduceQueueComparator extends QueueComparator {
- TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
- return qsi.reduceTSI;
+ TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+ return qsi.getReduceTSC();
}
}
+
// these are our comparator instances
- protected final static MapQueueComparator mapComparator = new MapQueueComparator();
- protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator();
+ protected final static MapQueueComparator mapComparator =
+ new MapQueueComparator();
+ protected final static ReduceQueueComparator reduceComparator =
+ new ReduceQueueComparator();
// and this is the comparator to use
protected QueueComparator queueComparator;
// Returns queues sorted according to the QueueComparator.
// Mainly for testing purposes.
String[] getOrderedQueues() {
- List<String> queues = new ArrayList<String>(qsiForAssigningTasks.size());
- for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
- queues.add(qsi.queueName);
+ List<AbstractQueue> queueList = getOrderedJobQueues();
+ List<String> queues = new ArrayList<String>(queueList.size());
+ for (AbstractQueue q : queueList) {
+ queues.add(q.getName());
}
return queues.toArray(new String[queues.size()]);
}
+ /**
+ * Return an ordered list of {@link JobQueue}s wrapped as
+ * {@link AbstractQueue}s. Ordering is according to {@link QueueComparator}.
+ * To reflect the true ordering of the JobQueues, the complete hierarchy is
+ * sorted such that {@link AbstractQueue}s are ordered according to their
+ * needs at each level in the hierarchy, after which only the leaf level
+ * {@link JobQueue}s are returned.
+ *
+ * @return a list of {@link JobQueue}s wrapped as {@link AbstractQueue}s
+ * sorted by their needs.
+ */
+ List<AbstractQueue> getOrderedJobQueues() {
+ scheduler.root.sort(queueComparator);
+ return scheduler.root.getDescendentJobQueues();
+ }
+
TaskSchedulingMgr(CapacityTaskScheduler sched) {
scheduler = sched;
}
-
- // let the scheduling mgr know which queues are in the system
- void initialize(Map<String, QueueSchedulingInfo> qsiMap) {
- // add all the qsi objects to our list and sort
- qsiForAssigningTasks.addAll(qsiMap.values());
- Collections.sort(qsiForAssigningTasks, queueComparator);
- }
-
- private synchronized void updateCollectionOfQSIs() {
- Collections.sort(qsiForAssigningTasks, queueComparator);
- }
-
- private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+ private boolean isUserOverLimit(JobInProgress j,
+ QueueSchedulingContext qsc) {
// what is our current capacity? It is equal to the queue-capacity if
// we're running below capacity. If we're running over capacity, then its
// #running plus slotPerTask of the job (which is the number of extra
// slots we're getting).
int currentCapacity;
- TaskSchedulingInfo tsi = getTSI(qsi);
- if (tsi.numSlotsOccupied < tsi.getCapacity()) {
+ TaskSchedulingContext tsi = getTSC(qsc);
+ if (tsi.getNumSlotsOccupied() < tsi.getCapacity()) {
currentCapacity = tsi.getCapacity();
}
else {
- currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
+ currentCapacity =
+ tsi.getNumSlotsOccupied() +
+ TaskDataView.getTaskDataView(type).getSlotsPerTask(j);
}
int limit = Math.max((int)(Math.ceil((double)currentCapacity/
- (double)qsi.numJobsByUser.size())),
- (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
+ (double) qsc.getNumJobsByUser().size())),
+ (int)(Math.ceil((double)(qsc.getUlMin() *currentCapacity)/100.0)));
String user = j.getProfile().getUser();
- if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
- LOG.debug("User " + user + " is over limit, num slots occupied = " +
- tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit);
+ if (tsi.getNumSlotsOccupiedByUser().get(user) >= limit) {
+ LOG.debug("User " + user + " is over limit, num slots occupied = " +
+ tsi.getNumSlotsOccupiedByUser().get(user) + ", limit = " + limit);
return true;
}
else {
@@ -488,29 +270,36 @@
}
/*
- * This is the central scheduling method.
- * It tries to get a task from jobs in a single queue.
- * Always return a TaskLookupResult object. Don't return null.
+ * This is the central scheduling method.
+ * It tries to get a task from jobs in a single queue.
+ * Always return a TaskLookupResult object. Don't return null.
*/
private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
- QueueSchedulingInfo qsi)
+ QueueSchedulingContext qsi)
throws IOException {
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
// we only look at jobs in the running queues, as these are the ones
// who have been potentially initialized
- for (JobInProgress j :
- scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
- // only look at jobs that can be run. We ignore jobs that haven't
- // initialized, or have completed but haven't been removed from the
- // running queue.
+ for (JobInProgress j :
+ scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+ .getRunningJobs()) {
+ // only look at jobs that can be run. We ignore jobs that haven't
+ // initialized, or have completed but haven't been removed from the
+ // running queue.
+
+ //Check queue for maximum capacity .
+ if(areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+ continue;
+ }
+
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
// check if the job's user is over limit
if (isUserOverLimit(j, qsi)) {
continue;
- }
+ }
//If this job meets memory requirements. Ask the JobInProgress for
//a task to be scheduled on the task tracker.
//if we find a job then we pass it on.
@@ -526,7 +315,6 @@
//skip to the next job in the queue.
LOG.debug("Job " + j.getJobID().toString()
+ " returned no tasks of type " + type);
- continue;
}
} else {
// if memory requirements don't match then we check if the job has
@@ -534,7 +322,9 @@
// tasktrackers to cover all pending tasks. If so we reserve the
// current tasktracker for this job so that high memory jobs are not
// starved
- if ((getPendingTasks(j) != 0 && !hasSufficientReservedTaskTrackers(j))) {
+ TaskDataView view = TaskDataView.getTaskDataView(type);
+ if ((view.getPendingTasks(j) != 0 &&
+ !view.hasSufficientReservedTaskTrackers(j))) {
// Reserve all available slots on this tasktracker
LOG.info(j.getJobID() + ": Reserving "
+ taskTracker.getTrackerName()
@@ -549,28 +339,36 @@
// if we're here, this job has no task to run. Look at the next job.
}//end of for loop
- // if we're here, we haven't found any task to run among all jobs in
- // the queue. This could be because there is nothing to run, or that
- // the user limit for some user is too strict, i.e., there's at least
- // one user who doesn't have enough tasks to satisfy his limit. If
- // it's the latter case, re-look at jobs without considering user
+ // if we're here, we haven't found any task to run among all jobs in
+ // the queue. This could be because there is nothing to run, or that
+ // the user limit for some user is too strict, i.e., there's at least
+ // one user who doesn't have enough tasks to satisfy his limit. If
+ // it's the latter case, re-look at jobs without considering user
// limits, and get a task from the first eligible job; however
- // we do not 'reserve' slots on tasktrackers anymore since the user is
+ // we do not 'reserve' slots on tasktrackers anymore since the user is
// already over the limit
- // Note: some of the code from above is repeated here. This is on
- // purpose as it improves overall readability.
+ // Note: some of the code from above is repeated here. This is on
+ // purpose as it improves overall readability.
// Note: we walk through jobs again. Some of these jobs, which weren't
- // considered in the first pass, shouldn't be considered here again,
+ // considered in the first pass, shouldn't be considered here again,
// but we still check for their viability to keep the code simple. In
- // some cases, for high mem jobs that have nothing to run, we call
- // obtainNewTask() unnecessarily. Should this be a problem, we can
- // create a list of jobs to look at (those whose users were over
- // limit) in the first pass and walk through that list only.
- for (JobInProgress j :
- scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+ // some cases, for high mem jobs that have nothing to run, we call
+ // obtainNewTask() unnecessarily. Should this be a problem, we can
+ // create a list of jobs to look at (those whose users were over
+ // limit) in the first pass and walk through that list only.
+ for (JobInProgress j :
+ scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+ .getRunningJobs()) {
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
+
+ //Check for the maximum-capacity.
+ if(areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+ continue;
+ }
+
+
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
taskTrackerStatus)) {
// We found a suitable job. Get task from it.
@@ -580,87 +378,89 @@
// we're successful in getting a task
return TaskLookupResult.getTaskFoundResult(t);
} else {
- //skip to the next job in the queue.
- continue;
}
} else {
- //if memory requirements don't match then we check if the
+ //if memory requirements don't match then we check if the
//job has either pending or speculative task. If the job
//has pending or speculative task we block till this job
- //tasks get scheduled, so that high memory jobs are not
+ //tasks get scheduled, so that high memory jobs are not
//starved
- if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus)) {
+ if (TaskDataView.getTaskDataView(type).getPendingTasks(j) != 0 ||
+ hasSpeculativeTask(j, taskTrackerStatus)) {
return TaskLookupResult.getMemFailedResult();
- }
+ }
}//end of memory check block
}//end of for loop
// found nothing for this queue, look at the next one.
- String msg = "Found no task from the queue " + qsi.queueName;
+ String msg = "Found no task from the queue " + qsi.getQueueName();
LOG.debug(msg);
return TaskLookupResult.getNoTaskFoundResult();
}
- // Always return a TaskLookupResult object. Don't return null.
- // The caller is responsible for ensuring that the QSI objects and the
+ // Always return a TaskLookupResult object. Don't return null.
+ // The caller is responsible for ensuring that the QSC objects and the
// collections are up-to-date.
- private TaskLookupResult assignTasks(TaskTracker taskTracker)
+ private TaskLookupResult assignTasks(TaskTracker taskTracker)
throws IOException {
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
- printQSIs();
+ printQSCs();
// Check if this tasktracker has been reserved for a job...
JobInProgress job = taskTracker.getJobForFallowSlot(type);
if (job != null) {
int availableSlots = taskTracker.getAvailableSlots(type);
if (LOG.isDebugEnabled()) {
- LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " +
- taskTracker.getTrackerName() + " with " + availableSlots +
+ LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " +
+ taskTracker.getTrackerName() + " with " + availableSlots +
" '" + type + "' slots");
}
if (availableSlots >= job.getNumSlotsPerTask(type)) {
- // Unreserve
+ // Unreserve
taskTracker.unreserveSlots(type, job);
-
+
// We found a suitable job. Get task from it.
Task t = obtainNewTask(taskTrackerStatus, job);
//if there is a task return it immediately.
if (t != null) {
if (LOG.isDebugEnabled()) {
- LOG.info(job.getJobID() + ": Got " + t.getTaskID() +
- " for reserved tasktracker " +
+ LOG.info(job.getJobID() + ": Got " + t.getTaskID() +
+ " for reserved tasktracker " +
taskTracker.getTrackerName());
}
// we're successful in getting a task
return TaskLookupResult.getTaskFoundResult(t);
- }
+ }
} else {
// Re-reserve the current tasktracker
taskTracker.reserveSlots(type, job, availableSlots);
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug(job.getJobID() + ": Re-reserving " +
+ LOG.debug(job.getJobID() + ": Re-reserving " +
taskTracker.getTrackerName());
}
- return TaskLookupResult.getMemFailedResult();
+ return TaskLookupResult.getMemFailedResult();
}
}
-
-
- for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
- // we may have queues with capacity=0. We shouldn't look at jobs from
+
+ for (AbstractQueue q : getOrderedJobQueues()) {
+ QueueSchedulingContext qsc = q.getQueueSchedulingContext();
+ // we may have queues with capacity=0. We shouldn't look at jobs from
// these queues
- if (0 == getTSI(qsi).getCapacity()) {
+ if (0 == getTSC(qsc).getCapacity()) {
continue;
}
-
- if(this.areTasksInQueueOverLimit(qsi)) {
+
+ //This call is important for optimization purposes , if we
+ //have reached the limit already no need for traversing the queue.
+ if(this.areTasksInQueueOverMaxCapacity(qsc,1)) {
continue;
}
- TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
+
+ TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc);
TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
@@ -672,7 +472,7 @@
return tlr;
}
// if there was a memory mismatch, return
- else if (lookUpStatus ==
+ else if (lookUpStatus ==
TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {
return tlr;
}
@@ -684,22 +484,32 @@
/**
- * Check if the max task limit is set for this queue
- * if set , ignore this qsi if current num of occupied
- * slots of a TYPE in the queue is >= getMaxTaskCap().
- * @param qsi
- * @return
+ * Check if maximum-capacity is set for this queue.
+ * If set and greater than 0 ,
+ * check if numofslotsoccupied+numSlotsPerTask is greater than
+ * maximum-Capacity ,if yes , implies this queue is over limit.
+ *
+ * Incase noOfSlotsOccupied is less than maximum-capacity ,but ,
+ * numOfSlotsOccupied+noSlotsPerTask is more than maximum-capacity we still
+ * dont assign the task . This may lead to under utilization of very small
+ * set of slots. But this is ok ,as we strictly respect the maximum-capacity
+ * @param qsc
+ * @param noOfSlotsPerTask
+ * @return true if queue is over maximum-capacity
*/
-
- private boolean areTasksInQueueOverLimit(QueueSchedulingInfo qsi) {
- TaskSchedulingInfo tsi = getTSI(qsi);
- if (tsi.getMaxTaskLimit() >= 0) {
- if (tsi.numSlotsOccupied >= tsi.getCapacity()) {
+ private boolean areTasksInQueueOverMaxCapacity(
+ QueueSchedulingContext qsc,int noOfSlotsPerTask) {
+ TaskSchedulingContext tsi = getTSC(qsc);
+ //check for maximum-capacity
+ if(tsi.getMaxCapacity() >= 0) {
+ if ((tsi.getNumSlotsOccupied() + noOfSlotsPerTask) >
+ tsi.getMaxCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Queue " + qsi.queueName + " has reached its max " + type +
- " limit ");
+ "Queue " + qsc.getQueueName() + " " + "has reached its max " +
+ type + "Capacity");
LOG.debug("Current running tasks " + tsi.getCapacity());
+
}
return true;
}
@@ -707,43 +517,45 @@
return false;
}
+
// for debugging.
- private void printQSIs() {
+ private void printQSCs() {
if (LOG.isDebugEnabled()) {
StringBuffer s = new StringBuffer();
- for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
- TaskSchedulingInfo tsi = getTSI(qsi);
+ for (AbstractQueue aq: getOrderedJobQueues()) {
+ QueueSchedulingContext qsi = aq.getQueueSchedulingContext();
+ TaskSchedulingContext tsi = getTSC(qsi);
Collection<JobInProgress> runJobs =
- scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+ scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+ .getRunningJobs();
s.append(
String.format(
" Queue '%s'(%s): runningTasks=%d, "
- + "occupiedSlots=%d, capacity=%d, runJobs=%d maxTaskLimit=%d ",
- qsi.queueName,
- this.type, Integer.valueOf(tsi.numRunningTasks), Integer
- .valueOf(tsi.numSlotsOccupied), Integer
- .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
- Integer.valueOf(tsi.getMaxTaskLimit())));
+ + "occupiedSlots=%d, capacity=%d, runJobs=%d maximumCapacity=%d ",
+ qsi.getQueueName(),
+ this.type, tsi.getNumRunningTasks(),
+ tsi.getNumSlotsOccupied(), tsi.getCapacity(), (runJobs.size()),
+ tsi.getMaxCapacity()));
}
LOG.debug(s);
}
}
-
+
/**
- * Check if one of the tasks have a speculative task to execute on the
+ * Check if one of the tasks have a speculative task to execute on the
* particular task tracker.
- *
+ *
* @param tips tasks of a job
- * @param progress percentage progress of the job
* @param tts task tracker status for which we are asking speculative tip
* @return true if job has a speculative task to run on particular TT.
*/
- boolean hasSpeculativeTask(TaskInProgress[] tips, float progress,
- TaskTrackerStatus tts) {
+ boolean hasSpeculativeTask(
+ TaskInProgress[] tips,
+ TaskTrackerStatus tts) {
long currentTime = System.currentTimeMillis();
for(TaskInProgress tip : tips) {
- if(tip.isRunning()
- && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName()))
+ if(tip.isRunning()
+ && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName()))
&& tip.canBeSpeculated(currentTime)) {
return true;
}
@@ -753,7 +565,7 @@
}
/**
- * The scheduling algorithms for map tasks.
+ * The scheduling algorithms for map tasks.
*/
private static class MapSchedulingMgr extends TaskSchedulingMgr {
@@ -764,12 +576,12 @@
}
@Override
- Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
+ Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
throws IOException {
- ClusterStatus clusterStatus =
+ ClusterStatus clusterStatus =
scheduler.taskTrackerManager.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
- return job.obtainNewMapTask(taskTracker, numTaskTrackers,
+ return job.obtainNewMapTask(taskTracker, numTaskTrackers,
scheduler.taskTrackerManager.getNumberOfUniqueHosts());
}
@@ -779,43 +591,24 @@
}
@Override
- int getRunningTasks(JobInProgress job) {
- return job.runningMaps();
- }
-
- @Override
- int getPendingTasks(JobInProgress job) {
- return job.pendingMaps();
+ TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+ return qsi.getMapTSC();
}
- @Override
- int getSlotsPerTask(JobInProgress job) {
- return
- job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());
- }
-
- @Override
- TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
- return qsi.mapTSI;
- }
-
- int getNumReservedTaskTrackers(JobInProgress job) {
- return job.getNumReservedTaskTrackersForMaps();
- }
@Override
boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
- //Check if job supports speculative map execution first then
+ //Check if job supports speculative map execution first then
//check if job has speculative maps.
return (job.getJobConf().getMapSpeculativeExecution())&& (
- hasSpeculativeTask(job.getMapTasks(),
- job.getStatus().mapProgress(), tts));
+ hasSpeculativeTask(job.getMapTasks(),
+ tts));
}
}
/**
- * The scheduling algorithms for reduce tasks.
+ * The scheduling algorithms for reduce tasks.
*/
private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
@@ -826,12 +619,12 @@
}
@Override
- Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
+ Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
throws IOException {
- ClusterStatus clusterStatus =
+ ClusterStatus clusterStatus =
scheduler.taskTrackerManager.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
- return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+ return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
scheduler.taskTrackerManager.getNumberOfUniqueHosts());
}
@@ -842,28 +635,8 @@
}
@Override
- int getRunningTasks(JobInProgress job) {
- return job.runningReduces();
- }
-
- @Override
- int getPendingTasks(JobInProgress job) {
- return job.pendingReduces();
- }
-
- @Override
- int getSlotsPerTask(JobInProgress job) {
- return
- job.getJobConf().computeNumSlotsPerReduce(scheduler.getMemSizeForReduceSlot());
- }
-
- @Override
- TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
- return qsi.reduceTSI;
- }
-
- int getNumReservedTaskTrackers(JobInProgress job) {
- return job.getNumReservedTaskTrackersForReduces();
+ TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+ return qsi.getReduceTSC();
}
@Override
@@ -871,33 +644,24 @@
//check if the job supports reduce speculative execution first then
//check if the job has speculative tasks.
return (job.getJobConf().getReduceSpeculativeExecution()) && (
- hasSpeculativeTask(job.getReduceTasks(),
- job.getStatus().reduceProgress(), tts));
+ hasSpeculativeTask(job.getReduceTasks(),
+ tts));
}
}
-
- /** the scheduling mgrs for Map and Reduce tasks */
+
+ /** the scheduling mgrs for Map and Reduce tasks */
protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
- MemoryMatcher memoryMatcher = new MemoryMatcher(this);
+ MemoryMatcher memoryMatcher = new MemoryMatcher();
- /** we keep track of the number of map/reduce slots we saw last */
- private int prevMapClusterCapacity = 0;
- private int prevReduceClusterCapacity = 0;
-
-
static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
protected JobQueuesManager jobQueuesManager;
- protected CapacitySchedulerConf schedConf;
+
/** whether scheduler has started or not */
private boolean started = false;
- final static String JOB_SCHEDULING_INFO_FORMAT_STRING =
- "%s running map tasks using %d map slots. %d additional slots reserved." +
- " %s running reduce tasks using %d reduce slots." +
- " %d additional slots reserved.";
/**
* A clock class - can be mocked out for testing.
*/
@@ -910,10 +674,22 @@
private Clock clock;
private JobInitializationPoller initializationPoller;
- private long memSizeForMapSlotOnJT;
- private long memSizeForReduceSlotOnJT;
- private long limitMaxMemForMapTasks;
- private long limitMaxMemForReduceTasks;
+ class CapacitySchedulerQueueRefresher extends QueueRefresher {
+ @Override
+ void refreshQueues(List<JobQueueInfo> newRootQueues)
+ throws Throwable {
+ if (!started) {
+ String msg =
+ "Capacity Scheduler is not in the 'started' state."
+ + " Cannot refresh queues.";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ CapacitySchedulerConf schedConf = new CapacitySchedulerConf();
+ initializeQueues(newRootQueues, schedConf, true);
+ initializationPoller.refreshQueueInfo(schedConf);
+ }
+ }
public CapacityTaskScheduler() {
this(new Clock());
@@ -921,105 +697,20 @@
// for testing
public CapacityTaskScheduler(Clock clock) {
- this.jobQueuesManager = new JobQueuesManager(this);
+ this.jobQueuesManager = new JobQueuesManager();
this.clock = clock;
}
-
- /** mostly for testing purposes */
- public void setResourceManagerConf(CapacitySchedulerConf conf) {
- this.schedConf = conf;
- }
-
- private void initializeMemoryRelatedConf() {
- //handling @deprecated
- if (conf.get(
- CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY) !=
- null) {
- LOG.warn(
- JobConf.deprecatedString(
- CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY));
- }
-
- //handling @deprecated
- if (conf.get(CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY) !=
- null) {
- LOG.warn(
- JobConf.deprecatedString(
- CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY));
- }
-
- if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) {
- LOG.warn(
- JobConf.deprecatedString(
- JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY));
- }
-
- memSizeForMapSlotOnJT =
- JobConf.normalizeMemoryConfigValue(conf.getLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
- memSizeForReduceSlotOnJT =
- JobConf.normalizeMemoryConfigValue(conf.getLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
-
- //handling @deprecated values
- if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
- LOG.warn(
- JobConf.deprecatedString(
- JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
- " instead use " +JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY+
- " and " + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY
- );
-
- limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
- JobConf.normalizeMemoryConfigValue(
- conf.getLong(
- JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
- if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT &&
- limitMaxMemForMapTasks >= 0) {
- limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
- limitMaxMemForMapTasks /
- (1024 * 1024); //Converting old values in bytes to MB
- }
- } else {
- limitMaxMemForMapTasks =
- JobConf.normalizeMemoryConfigValue(
- conf.getLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
- limitMaxMemForReduceTasks =
- JobConf.normalizeMemoryConfigValue(
- conf.getLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
- }
- LOG.info(String.format("Scheduler configured with "
- + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
- + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
- + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long
- .valueOf(memSizeForReduceSlotOnJT), Long
- .valueOf(limitMaxMemForMapTasks), Long
- .valueOf(limitMaxMemForReduceTasks)));
- }
- long getMemSizeForMapSlot() {
- return memSizeForMapSlotOnJT;
- }
-
- long getMemSizeForReduceSlot() {
- return memSizeForReduceSlotOnJT;
- }
-
- long getLimitMaxMemForMapSlot() {
- return limitMaxMemForMapTasks;
- }
-
- long getLimitMaxMemForReduceSlot() {
- return limitMaxMemForReduceTasks;
+ @Override
+ QueueRefresher getQueueRefresher() {
+ return new CapacitySchedulerQueueRefresher();
}
+ /**
+ * Only for testing.
+ * @param type
+ * @return
+ */
String[] getOrderedQueues(TaskType type) {
if (type == TaskType.MAP) {
return mapScheduler.getOrderedQueues();
@@ -1033,80 +724,144 @@
public synchronized void start() throws IOException {
if (started) return;
super.start();
- // initialize our queues from the config settings
- if (null == schedConf) {
- schedConf = new CapacitySchedulerConf();
- }
- initializeMemoryRelatedConf();
-
+ // Initialize MemoryMatcher
+ MemoryMatcher.initializeMemoryRelatedConf(conf);
+
// read queue info from config file
QueueManager queueManager = taskTrackerManager.getQueueManager();
- Set<String> queues = queueManager.getQueues();
- // Sanity check: there should be at least one queue.
- if (0 == queues.size()) {
- throw new IllegalStateException("System has no queue configured");
- }
- Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
- float totalCapacity = 0.0f;
- for (String queueName: queues) {
- float capacity = schedConf.getCapacity(queueName);
- if(capacity == -1.0) {
- queuesWithoutConfiguredCapacity.add(queueName);
- }else {
- totalCapacity += capacity;
- }
- int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
- // create our QSI and add to our hashmap
- QueueSchedulingInfo qsi = new QueueSchedulingInfo(
- queueName, capacity, ulMin, jobQueuesManager, schedConf.getMaxMapCap(
- queueName), schedConf.getMaxReduceCap(queueName));
- queueInfoMap.put(queueName, qsi);
-
- // create the queues of job objects
- boolean supportsPrio = schedConf.isPrioritySupported(queueName);
- jobQueuesManager.createQueue(queueName, supportsPrio);
-
- SchedulingDisplayInfo schedulingInfo =
- new SchedulingDisplayInfo(queueName, this);
- queueManager.setSchedulerInfo(queueName, schedulingInfo);
-
+ // initialize our queues from the config settings
+ CapacitySchedulerConf schedConf = new CapacitySchedulerConf();
+ try {
+ initializeQueues(queueManager.getRoot().getJobQueueInfo().getChildren(),
+ schedConf, false);
+ } catch (Throwable e) {
+ LOG.error("Couldn't initialize queues because of the excecption : "
+ + StringUtils.stringifyException(e));
+ throw new IOException(e);
}
- float remainingQuantityToAllocate = 100 - totalCapacity;
- float quantityToAllocate =
- remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
- for(String queue: queuesWithoutConfiguredCapacity) {
- QueueSchedulingInfo qsi = queueInfoMap.get(queue);
- qsi.capacityPercent = quantityToAllocate;
- schedConf.setCapacity(queue, quantityToAllocate);
- }
-
- if (totalCapacity > 100.0) {
- throw new IllegalArgumentException("Sum of queue capacities over 100% at "
- + totalCapacity);
- }
-
- // let our mgr objects know about the queues
- mapScheduler.initialize(queueInfoMap);
- reduceScheduler.initialize(queueInfoMap);
-
- // listen to job changes
+
+ // Queues are ready. Now register jobQueuesManager with the JobTracker so as
+ // to listen to job changes
taskTrackerManager.addJobInProgressListener(jobQueuesManager);
//Start thread for initialization
if (initializationPoller == null) {
this.initializationPoller = new JobInitializationPoller(
- jobQueuesManager,schedConf,queues, taskTrackerManager);
+ jobQueuesManager, taskTrackerManager);
}
- initializationPoller.init(queueManager.getQueues(), schedConf);
+ initializationPoller.init(jobQueuesManager.getJobQueueNames(), schedConf);
initializationPoller.setDaemon(true);
initializationPoller.start();
started = true;
- LOG.info("Capacity scheduler initialized " + queues.size() + " queues");
+
+ LOG.info("Capacity scheduler started successfully");
}
-
+
+ /**
+ * Read the configuration and initialize the queues. This operation should be
+ * done only when either the scheduler is starting or a request is received
+ * from {@link QueueManager} to refresh the queue configuration.
+ *
+ * <p>
+ *
+ * Even in case of refresh, we do not explicitly destroy AbstractQueue items,
+ * or the info maps, they will be automatically garbage-collected.
+ *
+ * <p>
+ *
+ * We don't explicitly lock the scheduler completely. This method is called at
+ * two times. 1) When the scheduler is starting. During this time, the lock
+ * sequence is JT->scheduler and so we don't need any more locking here. 2)
+ * When refresh is issued to {@link QueueManager}. When this happens, parallel
+ * refreshes are guarded by {@link QueueManager} itself by taking its lock.
+ *
+ * @param newRootQueues
+ * @param schedConf
+ * @param refreshingQueues
+ * @throws Throwable
+ */
+ private void initializeQueues(List<JobQueueInfo> newRootQueues,
+ CapacitySchedulerConf schedConf, boolean refreshingQueues)
+ throws Throwable {
+
+ if (newRootQueues == null) {
+ throw new IOException(
+ "Cannot initialize the queues with null root-queues!");
+ }
+
+ // Sanity check: there should be at least one queue.
+ if (0 == newRootQueues.size()) {
+ throw new IllegalStateException("System has no queue configured!");
+ }
+
+ // Create a new queue-hierarchy builder and try loading the complete
+ // hierarchy of queues.
+ AbstractQueue newRootAbstractQueue;
+ try {
+ newRootAbstractQueue =
+ new QueueHierarchyBuilder().createHierarchy(newRootQueues, schedConf);
+
+ } catch (Throwable e) {
+ LOG.error("Exception while tryign to (re)initializing queues : "
+ + StringUtils.stringifyException(e));
+ LOG.info("(Re)initializing the queues with the new configuration "
+ + "failed, so keeping the old configuration.");
+ throw e;
+ }
+
+ // New configuration is successfully validated and applied, set the new
+ // configuration to the current queue-hierarchy.
+
+ if (refreshingQueues) {
+ // Scheduler is being refreshed.
+
+ // Going to commit the changes to the hierarchy. Lock the scheduler.
+ synchronized (this) {
+ AbstractQueueComparator comparator = new AbstractQueueComparator();
+ this.root.sort(comparator);
+ newRootAbstractQueue.sort(comparator);
+ root.validateAndCopyQueueContexts(newRootAbstractQueue);
+ }
+ } else {
+ // Scheduler is just starting.
+
+ this.root = newRootAbstractQueue;
+
+ // JobQueue objects are created. Inform the JobQueuesManager so that it
+ // can track the running/waiting jobs. JobQueuesManager is still not added
+ // as a listener to JobTracker, so no locking needed.
+ addJobQueuesToJobQueuesManager();
+ }
+
+ List<AbstractQueue> allQueues = new ArrayList<AbstractQueue>();
+ allQueues.addAll(getRoot().getDescendantContainerQueues());
+ allQueues.addAll(getRoot().getDescendentJobQueues());
+ for (AbstractQueue queue : allQueues) {
+ if (!refreshingQueues) {
+ // Scheduler is just starting, create the display info also
+ createDisplayInfo(taskTrackerManager.getQueueManager(), queue.getName());
+ }
+
+ // QueueSchedulingContext objects are created/have changed. Put them
+ // (back) in the queue-info so as to be consumed by the UI.
+ addToQueueInfoMap(queue.getQueueSchedulingContext());
+ }
+ }
+
+ /**
+ * Inform the {@link JobQueuesManager} about the newly constructed
+ * {@link JobQueue}s.
+ */
+ private void addJobQueuesToJobQueuesManager() {
+ List<AbstractQueue> allJobQueues = getRoot().getDescendentJobQueues();
+ for (AbstractQueue jobQ : allJobQueues) {
+ jobQueuesManager.addQueue((JobQueue)jobQ);
+ }
+ }
+
/** mostly for testing purposes */
void setInitializationPoller(JobInitializationPoller p) {
this.initializationPoller = p;
@@ -1133,117 +888,36 @@
* provided for the test classes
* lets you update the QSI objects and sorted collections
*/
- void updateQSIInfoForTests() {
+ void updateContextInfoForTests() {
ClusterStatus c = taskTrackerManager.getClusterStatus();
int mapClusterCapacity = c.getMaxMapTasks();
int reduceClusterCapacity = c.getMaxReduceTasks();
// update the QSI objects
- updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
- mapScheduler.updateCollectionOfQSIs();
- reduceScheduler.updateCollectionOfQSIs();
+ updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
+ mapScheduler.scheduler.root.sort(mapScheduler.queueComparator);
+ reduceScheduler.scheduler.root.sort(reduceScheduler.queueComparator);
}
/**
- * Update individual QSI objects.
+ * Update individual QSC objects.
* We don't need exact information for all variables, just enough for us
* to make scheduling decisions. For example, we don't need an exact count
* of numRunningTasks. Once we count upto the grid capacity, any
* number beyond that will make no difference.
*
**/
- private synchronized void updateQSIObjects(int mapClusterCapacity,
+ private synchronized void updateContextObjects(int mapClusterCapacity,
int reduceClusterCapacity) {
- // if # of slots have changed since last time, update.
- // First, compute whether the total number of TT slots have changed
- for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
- // compute new capacities, if TT slots have changed
- if (mapClusterCapacity != prevMapClusterCapacity) {
- qsi.mapTSI.setCapacity((int)
- (qsi.capacityPercent*mapClusterCapacity/100));
- }
- if (reduceClusterCapacity != prevReduceClusterCapacity) {
- qsi.reduceTSI.setCapacity((int)
- (qsi.capacityPercent*reduceClusterCapacity/100));
- }
- // reset running/pending tasks, tasks per user
- qsi.mapTSI.resetTaskVars();
- qsi.reduceTSI.resetTaskVars();
- // update stats on running jobs
- for (JobInProgress j:
- jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
- if (j.getStatus().getRunState() != JobStatus.RUNNING) {
- continue;
- }
+ root.update(mapClusterCapacity,reduceClusterCapacity);
- int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
- int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
- int numRunningMapSlots =
- numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
- int numRunningReduceSlots =
- numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
- int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
- int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
- int numReservedMapSlotsForThisJob =
- (mapScheduler.getNumReservedTaskTrackers(j) *
- mapScheduler.getSlotsPerTask(j));
- int numReservedReduceSlotsForThisJob =
- (reduceScheduler.getNumReservedTaskTrackers(j) *
- reduceScheduler.getSlotsPerTask(j));
- j.setSchedulingInfo(
- String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
- Integer.valueOf(numMapsRunningForThisJob),
- Integer.valueOf(numRunningMapSlots),
- Integer.valueOf(numReservedMapSlotsForThisJob),
- Integer.valueOf(numReducesRunningForThisJob),
- Integer.valueOf(numRunningReduceSlots),
- Integer.valueOf(numReservedReduceSlotsForThisJob)));
- qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
- qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
- qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
- qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob;
- Integer i =
- qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
- qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
- Integer.valueOf(i.intValue() + numMapSlotsForThisJob));
- i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
- qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
- Integer.valueOf(i.intValue() + numReduceSlotsForThisJob));
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
- + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
- + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
- + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
- .getJobID().toString(), Integer
- .valueOf(numMapsRunningForThisJob), Integer
- .valueOf(numMapSlotsForThisJob), Integer
- .valueOf(numReducesRunningForThisJob), Integer
- .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
- .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
- .valueOf(j.failedMapTasks),
- Integer.valueOf(j.failedReduceTasks), Integer
- .valueOf(j.speculativeMapTasks), Integer
- .valueOf(j.speculativeReduceTasks), Integer
- .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
- }
-
- /*
- * it's fine walking down the entire list of running jobs - there
- * probably will not be many, plus, we may need to go through the
- * list to compute numSlotsOccupiedByUser. If this is expensive, we
- * can keep a list of running jobs per user. Then we only need to
- * consider the first few jobs per user.
- */
- }
- }
-
- prevMapClusterCapacity = mapClusterCapacity;
- prevReduceClusterCapacity = reduceClusterCapacity;
}
/*
* The grand plan for assigning a task.
- * First, decide whether a Map or Reduce task should be given to a TT
- * (if the TT can accept either).
+ * Always assigns 1 reduce and 1 map , if sufficient slots are
+ * available for each of types.
+ * If not , then which ever type of slots are available , that type of task is
+ * assigned.
* Next, pick a queue. We only look at queues that need a slot. Among these,
* we first look at queues whose (# of running tasks)/capacity is the least.
* Next, pick a job in a queue. we pick the job at the front of the queue
@@ -1257,12 +931,12 @@
TaskLookupResult tlr;
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
+ List<Task> result = new ArrayList<Task>();
/*
- * If TT has Map and Reduce slot free, we need to figure out whether to
- * give it a Map or Reduce task.
- * Number of ways to do this. For now, base decision on how much is needed
- * versus how much is used (default to Map, if equal).
+ * If TT has Map and Reduce slot free, we assign 1 map and 1 reduce
+ * We base decision on how much is needed
+ * versus how much is used
*/
ClusterStatus c = taskTrackerManager.getClusterStatus();
int mapClusterCapacity = c.getMaxMapTasks();
@@ -1271,7 +945,8 @@
int currentMapSlots = taskTrackerStatus.countOccupiedMapSlots();
int maxReduceSlots = taskTrackerStatus.getMaxReduceSlots();
int currentReduceSlots = taskTrackerStatus.countOccupiedReduceSlots();
- LOG.debug("TT asking for task, max maps=" + taskTrackerStatus.getMaxMapSlots() +
+ LOG.debug("TT asking for task, max maps="
+ + taskTrackerStatus.getMaxMapSlots() +
", run maps=" + taskTrackerStatus.countMapTasks() + ", max reds=" +
taskTrackerStatus.getMaxReduceSlots() + ", run reds=" +
taskTrackerStatus.countReduceTasks() + ", map cap=" +
@@ -1279,145 +954,52 @@
reduceClusterCapacity);
/*
- * update all our QSI objects.
- * This involves updating each qsi structure. This operation depends
+ * update all our QSC objects.
+ * This involves updating each qsC structure. This operation depends
* on the number of running jobs in a queue, and some waiting jobs. If it
* becomes expensive, do it once every few heartbeats only.
*/
- updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+ updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
// make sure we get our map or reduce scheduling object to update its
- // collection of QSI objects too.
+ // collection of QSC objects too.
- if ((maxReduceSlots - currentReduceSlots) >
- (maxMapSlots - currentMapSlots)) {
- // get a reduce task first
- reduceScheduler.updateCollectionOfQSIs();
+ if (maxReduceSlots > currentReduceSlots) {
+ //reduce slot available , try to get a
+ //reduce task
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
- // found a task; return
- return Collections.singletonList(tlr.getTask());
- }
- // if we didn't get any, look at map tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
- == tlr.getLookUpStatus() ||
- TaskLookupResult.LookUpStatus.NO_TASK_FOUND
- == tlr.getLookUpStatus())
- && (maxMapSlots > currentMapSlots)) {
- mapScheduler.updateCollectionOfQSIs();
- tlr = mapScheduler.assignTasks(taskTracker);
- if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
- tlr.getLookUpStatus()) {
- return Collections.singletonList(tlr.getTask());
- }
+ result.add(tlr.getTask());
}
}
- else {
- // get a map task first
- mapScheduler.updateCollectionOfQSIs();
+
+ if(maxMapSlots > currentMapSlots) {
+ //map slot available , try to get a map task
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
- // found a task; return
- return Collections.singletonList(tlr.getTask());
+ result.add(tlr.getTask());
}
- // if we didn't get any, look at reduce tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
- == tlr.getLookUpStatus()
- || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
- == tlr.getLookUpStatus())
- && (maxReduceSlots > currentReduceSlots)) {
- reduceScheduler.updateCollectionOfQSIs();
- tlr = reduceScheduler.assignTasks(taskTracker);
- if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
- tlr.getLookUpStatus()) {
- return Collections.singletonList(tlr.getTask());
- }
- }
- }
-
- return null;
- }
-
- // called when a job is added
- synchronized void jobAdded(JobInProgress job) throws IOException {
- QueueSchedulingInfo qsi =
- queueInfoMap.get(job.getProfile().getQueueName());
- // qsi shouldn't be null
- // update user-specific info
- Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
- if (null == i) {
- i = 1;
- // set the count for running tasks to 0
- qsi.mapTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
- Integer.valueOf(0));
- qsi.reduceTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
- Integer.valueOf(0));
- }
- else {
- i++;
}
- qsi.numJobsByUser.put(job.getProfile().getUser(), i);
- // setup scheduler specific job information
- preInitializeJob(job);
-
- LOG.debug("Job " + job.getJobID().toString() + " is added under user "
- + job.getProfile().getUser() + ", user now has " + i + " jobs");
+ return (result.isEmpty()) ? null : result;
}
- /**
- * Setup {@link CapacityTaskScheduler} specific information prior to
- * job initialization.
- */
- void preInitializeJob(JobInProgress job) {
- JobConf jobConf = job.getJobConf();
-
- // Compute number of slots required to run a single map/reduce task
- int slotsPerMap = 1;
- int slotsPerReduce = 1;
- if (memoryMatcher.isSchedulingBasedOnMemEnabled()) {
- slotsPerMap = jobConf.computeNumSlotsPerMap(getMemSizeForMapSlot());
- slotsPerReduce =
- jobConf.computeNumSlotsPerReduce(getMemSizeForReduceSlot());
- }
- job.setNumSlotsPerMap(slotsPerMap);
- job.setNumSlotsPerReduce(slotsPerReduce);
- }
-
- // called when a job completes
- synchronized void jobCompleted(JobInProgress job) {
- QueueSchedulingInfo qsi =
- queueInfoMap.get(job.getProfile().getQueueName());
- // qsi shouldn't be null
- // update numJobsByUser
- LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
- Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
- i--;
- if (0 == i.intValue()) {
- qsi.numJobsByUser.remove(job.getProfile().getUser());
- // remove job footprint from our TSIs
- qsi.mapTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
- qsi.reduceTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
- LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
- }
- else {
- qsi.numJobsByUser.put(job.getProfile().getUser(), i);
- LOG.debug("User still has " + i + " jobs, number of users = "
- + qsi.numJobsByUser.size());
- }
- }
@Override
public synchronized Collection<JobInProgress> getJobs(String queueName) {
Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
- Collection<JobInProgress> runningJobs =
- jobQueuesManager.getRunningJobQueue(queueName);
+ JobQueue jobQueue = jobQueuesManager.getJobQueue(queueName);
+ if (jobQueue == null) {
+ return jobCollection;
+ }
+ Collection<JobInProgress> runningJobs =
+ jobQueue.getRunningJobs();
if (runningJobs != null) {
jobCollection.addAll(runningJobs);
}
Collection<JobInProgress> waitingJobs =
- jobQueuesManager.getWaitingJobs(queueName);
+ jobQueue.getWaitingJobs();
Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
if(waitingJobs != null) {
tempCollection.addAll(waitingJobs);
@@ -1433,13 +1015,51 @@
return initializationPoller;
}
- synchronized String getDisplayInfo(String queueName) {
- QueueSchedulingInfo qsi = queueInfoMap.get(queueName);
- if (null == qsi) {
+ private synchronized String getDisplayInfo(String queueName) {
+ QueueSchedulingContext qsi = queueInfoMap.get(queueName);
+ if (null == qsi) {
return null;
}
return qsi.toString();
}
-}
+ private synchronized void addToQueueInfoMap(QueueSchedulingContext qsc) {
+ queueInfoMap.put(qsc.getQueueName(), qsc);
+ }
+ /**
+ * Create the scheduler information and set it in the {@link QueueManager}.
+ * this should be only called when the scheduler is starting.
+ *
+ * @param queueManager
+ * @param queueName
+ */
+ private void createDisplayInfo(QueueManager queueManager, String queueName) {
+ if (queueManager != null) {
+ SchedulingDisplayInfo schedulingInfo =
+ new SchedulingDisplayInfo(queueName, this);
+ queueManager.setSchedulerInfo(queueName, schedulingInfo);
+ }
+ }
+
+
+ /**
+ * Use for testing purposes.
+ * returns the root
+ * @return
+ */
+ AbstractQueue getRoot() {
+ return this.root;
+ }
+
+
+ /**
+ * This is used for testing purpose only
+ * Dont use this method.
+ * @param rt
+ */
+ void setRoot(AbstractQueue rt) {
+ this.root = rt;
+ }
+
+}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Sat Nov 28 20:26:01 2009
@@ -134,7 +134,7 @@
if (job == null) {
continue;
}
- LOG.info("Initializing job : " + job.getJobID() + " in Queue "
+ LOG.info("Initializing job : " + job.getJobID() + " in AbstractQueue "
+ job.getProfile().getQueueName() + " For user : "
+ job.getProfile().getUser());
if (startIniting) {
@@ -246,9 +246,9 @@
*/
private HashMap<String, JobInitializationThread> threadsToQueueMap;
- public JobInitializationPoller(JobQueuesManager mgr,
- CapacitySchedulerConf rmConf, Set<String> queue,
- TaskTrackerManager ttm) {
+ public JobInitializationPoller(
+ JobQueuesManager mgr,
+ TaskTrackerManager ttm) {
initializedJobs = new HashMap<JobID,JobInProgress>();
jobQueues = new HashMap<String, QueueInfo>();
this.jobQueueManager = mgr;
@@ -265,20 +265,7 @@
void init(Set<String> queues,
CapacitySchedulerConf capacityConf) {
- for (String queue : queues) {
- int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
- int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
- int maxJobsPerUserToInitialize = capacityConf
- .getMaxJobsPerUserToInitialize(queue);
- QueueInfo qi = new QueueInfo(queue, maxUsersToInitialize,
- maxJobsPerUserToInitialize);
- jobQueues.put(queue, qi);
- }
- sleepInterval = capacityConf.getSleepInterval();
- poolSize = capacityConf.getMaxWorkerThreads();
- if (poolSize > queues.size()) {
- poolSize = queues.size();
- }
+ setupJobInitializerConfiguration(queues, capacityConf);
assignThreadsToQueues();
Collection<JobInitializationThread> threads = threadsToQueueMap.values();
for (JobInitializationThread t : threads) {
@@ -290,6 +277,63 @@
}
/**
+ * Initialize the configuration of the JobInitializer as well as of the specific
+ * queues.
+ *
+ * @param queues
+ * @param schedulerConf
+ */
+ private void setupJobInitializerConfiguration(Set<String> queues,
+ CapacitySchedulerConf schedulerConf) {
+ for (String queue : queues) {
+ int maxUsersToInitialize = getMaxUsersToInit(schedulerConf, queue);
+ int maxJobsPerUserToInitialize =
+ schedulerConf.getMaxJobsPerUserToInitialize(queue);
+ QueueInfo qi =
+ new QueueInfo(queue, maxUsersToInitialize,
+ maxJobsPerUserToInitialize);
+ jobQueues.put(queue, qi);
+ }
+ sleepInterval = schedulerConf.getSleepInterval();
+ poolSize = schedulerConf.getMaxWorkerThreads();
+ if (poolSize > queues.size()) {
+ poolSize = queues.size();
+ }
+ }
+
+ /**
+ *
+ * @param schedulerConf
+ * @param queue
+ * @return
+ */
+ private int getMaxUsersToInit(CapacitySchedulerConf schedulerConf,
+ String queue) {
+ int userlimit = schedulerConf.getMinimumUserLimitPercent(queue);
+ return (100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT;
+ }
+
+ /**
+ * Refresh the Scheduler configuration cached with the initializer. This
+ * should be called only by
+ * {@link CapacityTaskScheduler.CapacitySchedulerQueueRefresher#refreshQueues()}
+ * . The cached configuration currently is only used by the main thread in the
+ * initializer. So, any updates are picked up automatically by subsequent
+ * iterations of the main thread.
+ */
+ void refreshQueueInfo(CapacitySchedulerConf schedulerConf) {
+ for (String queue : jobQueues.keySet()) {
+ QueueInfo queueInfo = jobQueues.get(queue);
+ synchronized (queueInfo) {
+ queueInfo.maxUsersAllowedToInitialize =
+ getMaxUsersToInit(schedulerConf, queue);
+ queueInfo.maxJobsPerUserToInitialize =
+ schedulerConf.getMaxJobsPerUserToInitialize(queue);
+ }
+ }
+ }
+
+ /**
* This is main thread of initialization poller, We essentially do
* following in the main threads:
*
@@ -343,7 +387,7 @@
private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
for (JobInProgress job : jobsToInitialize) {
LOG.info("Passing to Initializer Job Id :" + job.getJobID()
- + " User: " + job.getProfile().getUser() + " Queue : "
+ + " User: " + job.getProfile().getUser() + " AbstractQueue : "
+ job.getProfile().getQueueName());
}
}
@@ -434,13 +478,17 @@
ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
// use the configuration parameter which is configured for the particular
// queue.
- int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
- int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
+ int maximumUsersAllowedToInitialize;
+ int maxJobsPerUserAllowedToInitialize;
+ synchronized (qi) {
+ maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
+ maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
+ }
int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
* maxJobsPerUserAllowedToInitialize;
int countOfJobsInitialized = 0;
HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
- Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
+ Collection<JobInProgress> jobs = jobQueueManager.getJobQueue(queue).getWaitingJobs();
/*
* Walk through the collection of waiting jobs.
* We maintain a map of jobs that have already been initialized. If a
@@ -536,7 +584,7 @@
LOG.info("Removing scheduled jobs from waiting queue"
+ job.getJobID());
jobsIterator.remove();
- jobQueueManager.removeJobFromWaitingQueue(job);
+ jobQueueManager.getJobQueue(job).removeWaitingJob(new JobSchedulingInfo(job));
continue;
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Sat Nov 28 20:26:01 2009
@@ -18,270 +18,85 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.Map;
-import java.util.TreeMap;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
-import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
/**
* A {@link JobInProgressListener} that maintains the jobs being managed in
- * one or more queues.
+ * one or more queues.
*/
class JobQueuesManager extends JobInProgressListener {
- /*
- * If a queue supports priorities, jobs must be
- * sorted on priorities, and then on their start times (technically,
- * their insertion time.
- * If a queue doesn't support priorities, jobs are
- * sorted based on their start time.
- */
-
- // comparator for jobs in queues that don't support priorities
- private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
- = new Comparator<JobSchedulingInfo>() {
- public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
- // the job that started earlier wins
- if (o1.getStartTime() < o2.getStartTime()) {
- return -1;
- } else {
- return (o1.getStartTime() == o2.getStartTime()
- ? o1.getJobID().compareTo(o2.getJobID())
- : 1);
- }
- }
- };
-
- // class to store queue info
- private static class QueueInfo {
-
- // whether the queue supports priorities
- boolean supportsPriorities;
- Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
- Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
-
- public Comparator<JobSchedulingInfo> comparator;
-
- QueueInfo(boolean prio) {
- this.supportsPriorities = prio;
- if (supportsPriorities) {
- // use the default priority-aware comparator
- comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
- }
- else {
- comparator = STARTTIME_JOB_COMPARATOR;
- }
- waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
- runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
- }
-
- Collection<JobInProgress> getWaitingJobs() {
- synchronized (waitingJobs) {
- return Collections.unmodifiableCollection(
- new LinkedList<JobInProgress>(waitingJobs.values()));
- }
- }
-
- Collection<JobInProgress> getRunningJobs() {
- synchronized (runningJobs) {
- return Collections.unmodifiableCollection(
- new LinkedList<JobInProgress>(runningJobs.values()));
- }
- }
-
- void addRunningJob(JobInProgress job) {
- synchronized (runningJobs) {
- runningJobs.put(new JobSchedulingInfo(job),job);
- }
- }
-
- JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
- synchronized (runningJobs) {
- return runningJobs.remove(jobInfo);
- }
- }
-
- JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
- synchronized (waitingJobs) {
- return waitingJobs.remove(schedInfo);
- }
- }
-
- void addWaitingJob(JobInProgress job) {
- synchronized (waitingJobs) {
- waitingJobs.put(new JobSchedulingInfo(job), job);
- }
- }
-
- int getWaitingJobCount() {
- synchronized (waitingJobs) {
- return waitingJobs.size();
- }
- }
-
- }
-
// we maintain a hashmap of queue-names to queue info
- private Map<String, QueueInfo> jobQueues =
- new HashMap<String, QueueInfo>();
+ private Map<String, JobQueue> jobQueues =
+ new HashMap<String, JobQueue>();
private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
- private CapacityTaskScheduler scheduler;
-
- JobQueuesManager(CapacityTaskScheduler s) {
- this.scheduler = s;
- }
-
- /**
- * create an empty queue with the default comparator
- * @param queueName The name of the queue
- * @param supportsPriotities whether the queue supports priorities
- */
- public void createQueue(String queueName, boolean supportsPriotities) {
- jobQueues.put(queueName, new QueueInfo(supportsPriotities));
- }
-
- /**
- * Returns the queue of running jobs associated with the name
- */
- public Collection<JobInProgress> getRunningJobQueue(String queueName) {
- return jobQueues.get(queueName).getRunningJobs();
+
+ JobQueuesManager() {
}
-
+
/**
- * Returns the queue of waiting jobs associated with queue name.
+ * Add the given queue to the map of queue name to job-queues.
*
+ * @param queue The job-queue
*/
- Collection<JobInProgress> getWaitingJobs(String queueName) {
- return jobQueues.get(queueName).getWaitingJobs();
+ public void addQueue(JobQueue queue) {
+ jobQueues.put(queue.getName(),queue);
}
-
+
@Override
public void jobAdded(JobInProgress job) throws IOException {
- LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
+ LOG.info("Job " + job.getJobID() + " submitted to queue "
+ + job.getProfile().getQueueName());
// add job to the right queue
- QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+ JobQueue qi = getJobQueue(job.getProfile().getQueueName());
if (null == qi) {
// job was submitted to a queue we're not aware of
- LOG.warn("Invalid queue " + job.getProfile().getQueueName() +
- " specified for job" + job.getProfile().getJobID() +
+ LOG.warn(
+ "Invalid queue " + job.getProfile().getQueueName() +
+ " specified for job " + job.getProfile().getJobID() +
". Ignoring job.");
return;
}
- // add job to waiting queue. It will end up in the right place,
- // based on priority.
- qi.addWaitingJob(job);
// let scheduler know.
- scheduler.jobAdded(job);
+ qi.jobAdded(job);
}
- /*
- * Method removes the jobs from both running and waiting job queue in
- * job queue manager.
- */
- private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo,
- QueueInfo qi) {
- LOG.info("Job " + job.getJobID().toString() + " submitted to queue "
- + job.getProfile().getQueueName() + " has completed");
- //remove jobs from both queue's a job can be in
- //running and waiting queue at the same time.
- qi.removeRunningJob(oldInfo);
- qi.removeWaitingJob(oldInfo);
- // let scheduler know
- scheduler.jobCompleted(job);
- }
-
// Note that job is removed when the job completes i.e in jobUpated()
@Override
- public void jobRemoved(JobInProgress job) {}
-
- // This is used to reposition a job in the queue. A job can get repositioned
- // because of the change in the job priority or job start-time.
- private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo,
- QueueInfo qi) {
-
- if(qi.removeWaitingJob(oldInfo) != null) {
- qi.addWaitingJob(job);
- }
- if(qi.removeRunningJob(oldInfo) != null) {
- qi.addRunningJob(job);
- }
- }
-
- // This is used to move a job from the waiting queue to the running queue.
- private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo,
- QueueInfo qi) {
- // Removing of the job from job list is responsibility of the
- //initialization poller.
- // Add the job to the running queue
- qi.addRunningJob(job);
- }
-
- // Update the scheduler as job's state has changed
- private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
- JobInProgress job = event.getJobInProgress();
- JobSchedulingInfo oldJobStateInfo =
- new JobSchedulingInfo(event.getOldStatus());
- // Check if the ordering of the job has changed
- // For now priority and start-time can change the job ordering
- if (event.getEventType() == EventType.PRIORITY_CHANGED
- || event.getEventType() == EventType.START_TIME_CHANGED) {
- // Make a priority change
- reorderJobs(job, oldJobStateInfo, qi);
- } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
- // Check if the job is complete
- int runState = job.getStatus().getRunState();
- if (runState == JobStatus.SUCCEEDED
- || runState == JobStatus.FAILED
- || runState == JobStatus.KILLED) {
- jobCompleted(job, oldJobStateInfo, qi);
- } else if (runState == JobStatus.RUNNING) {
- makeJobRunning(job, oldJobStateInfo, qi);
- }
- }
+ public void jobRemoved(JobInProgress job) {
}
-
+
+
@Override
public void jobUpdated(JobChangeEvent event) {
JobInProgress job = event.getJobInProgress();
- QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
- if (null == qi) {
- // can't find queue for job. Shouldn't happen.
- LOG.warn("Could not find queue " + job.getProfile().getQueueName() +
- " when updating job " + job.getProfile().getJobID());
- return;
- }
-
- // Check if this is the status change
- if (event instanceof JobStatusChangeEvent) {
- jobStateChanged((JobStatusChangeEvent)event, qi);
- }
- }
-
- void removeJobFromWaitingQueue(JobInProgress job) {
- String queue = job.getProfile().getQueueName();
- QueueInfo qi = jobQueues.get(queue);
- qi.removeWaitingJob(new JobSchedulingInfo(job));
+ JobQueue qi = getJobQueue(job.getProfile().getQueueName());
+ qi.jobUpdated(event);
+
}
-
+
Comparator<JobSchedulingInfo> getComparator(String queue) {
- return jobQueues.get(queue).comparator;
+ return getJobQueue(queue).comparator;
}
-
- int getWaitingJobCount(String queue) {
- QueueInfo qi = jobQueues.get(queue);
- return qi.getWaitingJobCount();
+
+
+ public JobQueue getJobQueue(JobInProgress jip){
+ return getJobQueue(jip.getProfile().getQueueName());
+ }
+
+ JobQueue getJobQueue(String name) {
+ return jobQueues.get(name);
}
- boolean doesQueueSupportPriorities(String queueName) {
- return jobQueues.get(queueName).supportsPriorities;
+ public Set<String> getJobQueueNames() {
+ return jobQueues.keySet();
}
}