You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/11/13 14:59:05 UTC
svn commit: r713723 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Author: yhemanth
Date: Thu Nov 13 05:59:04 2008
New Revision: 713723
URL: http://svn.apache.org/viewvc?rev=713723&view=rev
Log:
HADOOP-4471. Sort running jobs by priority in the capacity scheduler. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=713723&r1=713722&r2=713723&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Nov 13 05:59:04 2008
@@ -1118,7 +1118,10 @@
HADOOP-4595. Fixes two race conditions - one to do with updating free slot count,
and another to do with starting the MapEventsFetcher thread. (ddas)
- HADOOP-4552. Fix a deadlock in RPC server. (Raghu Angadi)
+ HADOOP-4552. Fix a deadlock in RPC server. (Raghu Angadi)
+
+ HADOOP-4471. Sort running jobs by priority in the capacity scheduler.
+ (Amar Kamat via yhemanth)
Release 0.18.3 - Unreleased
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=713723&r1=713722&r2=713723&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Thu Nov 13 05:59:04 2008
@@ -20,7 +20,6 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
@@ -36,13 +35,11 @@
class JobQueuesManager extends JobInProgressListener {
/*
- * If a queue supports priorities, waiting jobs must be
+ * If a queue supports priorities, jobs must be
* sorted on priorities, and then on their start times (technically,
* their insertion time.
- * If a queue doesn't support priorities, waiting jobs are
+ * If a queue doesn't support priorities, jobs are
* sorted based on their start time.
- * Running jobs are not sorted. A job that started running earlier
- * is ahead in the queue, so insertion should be at the tail.
*/
// comparator for jobs in queues that don't support priorities
@@ -65,12 +62,8 @@
// whether the queue supports priorities
boolean supportsPriorities;
- // maintain separate structures for running & waiting jobs. This we do
- // mainly because when a new job is added, it cannot superceede a running
- // job, even though the latter may be a lower priority. If this is ever
- // changed, we may get by with one collection.
- Map<JobSchedulingInfo, JobInProgress> waitingJobs;
- Collection<JobInProgress> runningJobs;
+ Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
+ Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
QueueInfo(boolean prio) {
this.supportsPriorities = prio;
@@ -79,12 +72,16 @@
this.waitingJobs =
new TreeMap<JobSchedulingInfo, JobInProgress>(
JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
+ this.runningJobs =
+ new TreeMap<JobSchedulingInfo, JobInProgress>(
+ JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
}
else {
this.waitingJobs =
new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
+ this.runningJobs =
+ new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
}
- this.runningJobs = new LinkedList<JobInProgress>();
}
}
@@ -112,7 +109,7 @@
* Returns the queue of running jobs associated with the name
*/
public Collection<JobInProgress> getRunningJobQueue(String queueName) {
- return jobQueues.get(queueName).runningJobs;
+ return jobQueues.get(queueName).runningJobs.values();
}
/**
@@ -146,7 +143,7 @@
LOG.info("Job " + job.getJobID().toString() + " submitted to queue "
+ job.getProfile().getQueueName() + " has completed");
// job could be in running or waiting queue
- if (!qi.runningJobs.remove(job)) {
+ if (qi.runningJobs.remove(oldInfo) != null) {
qi.waitingJobs.remove(oldInfo);
}
// let scheduler know
@@ -162,13 +159,14 @@
private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo,
QueueInfo qi) {
+ JobSchedulingInfo newInfo = new JobSchedulingInfo(job);
if (qi.waitingJobs.remove(oldInfo) == null) {
- qi.runningJobs.remove(job);
+ qi.runningJobs.remove(oldInfo);
// Add back to the running queue
- qi.runningJobs.add(job);
+ qi.runningJobs.put(newInfo, job);
} else {
// Add back to the waiting queue
- qi.waitingJobs.put(new JobSchedulingInfo(job), job);
+ qi.waitingJobs.put(newInfo, job);
}
}
@@ -179,7 +177,7 @@
qi.waitingJobs.remove(oldInfo);
// Add the job to the running queue
- qi.runningJobs.add(job);
+ qi.runningJobs.put(new JobSchedulingInfo(job), job);
}
// Update the scheduler as job's state has changed
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=713723&r1=713722&r2=713723&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Nov 13 05:59:04 2008
@@ -501,72 +501,34 @@
FakeJobInProgress fjob2 =
submitJob(JobStatus.PREP, 1, 0, "default", "user");
- // check if the job is in the waiting queue
- JobInProgress[] jobs =
- scheduler.jobQueuesManager.getWaitingJobQueue("default")
- .toArray(new JobInProgress[0]);
- assertTrue("Waiting queue doesnt contain queued job #1 in right order",
- jobs[0].getJobID().equals(fjob1.getJobID()));
- assertTrue("Waiting queue doesnt contain queued job #2 in right order",
- jobs[1].getJobID().equals(fjob2.getJobID()));
-
- // I. Check the start-time change
- // Change job2 start-time and check if job2 bumps up in the queue
- taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
-
- jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default")
- .toArray(new JobInProgress[0]);
- assertTrue("Start time change didnt not work as expected for job #2",
- jobs[0].getJobID().equals(fjob2.getJobID()));
- assertTrue("Start time change didnt not work as expected for job #1",
- jobs[1].getJobID().equals(fjob1.getJobID()));
-
- // check if the queue is fine
- assertEquals("Start-time change garbled the waiting queue",
- 2, scheduler.getJobs("default").size());
-
- // II. Change job priority change
- // Bump up job1's priority and make sure job1 bumps up in the queue
- taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
-
- // Check if the priority changes are reflected
- jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default")
- .toArray(new JobInProgress[0]);
- assertTrue("Priority change didnt not work as expected for job #1",
- jobs[0].getJobID().equals(fjob1.getJobID()));
- assertTrue("Priority change didnt not work as expected for job #2",
- jobs[1].getJobID().equals(fjob2.getJobID()));
+ // test if changing the job priority/start-time works as expected in the
+ // waiting queue
+ testJobOrderChange(fjob1, fjob2, true);
- // check if the queue is fine
- assertEquals("Priority change has garbled the waiting queue",
- 2, scheduler.getJobs("default").size());
+ // Init the jobs
+ // simulate the case where the job with a lower priority becomes running
+ // first (may be because of the setup tasks).
- // Create an event
- JobChangeEvent event = initTasksAndReportEvent(fjob1);
+ // init the lower ranked job first
+ JobChangeEvent event = initTasksAndReportEvent(fjob2);
// inform the scheduler
scheduler.jobQueuesManager.jobUpdated(event);
- // waiting queue
- Collection<JobInProgress> wqueue =
- scheduler.jobQueuesManager.getWaitingJobQueue("default");
-
- // check if the job is not in the waiting queue
- assertFalse("Waiting queue contains running/inited job",
- wqueue.contains(fjob1));
+ // init the higher ordered job later
+ event = initTasksAndReportEvent(fjob1);
- // check if the waiting queue is fine
- assertEquals("Waiting queue is garbled on job init", 1, wqueue.size());
-
- Collection<JobInProgress> rqueue =
- scheduler.jobQueuesManager.getRunningJobQueue("default");
-
- // check if the job is in the running queue
- assertTrue("Running queue doesnt contain running/inited job",
- rqueue.contains(fjob1));
+ // inform the scheduler
+ scheduler.jobQueuesManager.jobUpdated(event);
- // check if the running queue is fine
- assertEquals("Running queue is garbled upon init", 1, rqueue.size());
+ // check if the jobs are missing from the waiting queue
+ assertEquals("Waiting queue is garbled on job init", 0,
+ scheduler.jobQueuesManager.getWaitingJobQueue("default")
+ .size());
+
+ // test if changing the job priority/start-time works as expected in the
+ // running queue
+ testJobOrderChange(fjob1, fjob2, false);
// schedule a task
List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
@@ -578,7 +540,8 @@
// mark the job as complete
taskTrackerManager.finalizeJob(fjob1);
- rqueue = scheduler.jobQueuesManager.getRunningJobQueue("default");
+ Collection<JobInProgress> rqueue =
+ scheduler.jobQueuesManager.getRunningJobQueue("default");
// check if the job is removed from the scheduler
assertFalse("Scheduler contains completed job",
@@ -586,8 +549,67 @@
// check if the running queue size is correct
assertEquals("Job finish garbles the queue",
- 0, rqueue.size());
+ 1, rqueue.size());
+
+ }
+
+ // test if the queue reflects the changes
+ private void testJobOrderChange(FakeJobInProgress fjob1,
+ FakeJobInProgress fjob2,
+ boolean waiting) {
+ String queueName = waiting ? "waiting" : "running";
+
+ // check if the jobs in the queue are the right order
+ JobInProgress[] jobs = getJobsInQueue(waiting);
+ assertTrue(queueName + " queue doesnt contain job #1 in right order",
+ jobs[0].getJobID().equals(fjob1.getJobID()));
+ assertTrue(queueName + " queue doesnt contain job #2 in right order",
+ jobs[1].getJobID().equals(fjob2.getJobID()));
+
+ // I. Check the start-time change
+ // Change job2 start-time and check if job2 bumps up in the queue
+ taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
+
+ jobs = getJobsInQueue(waiting);
+ assertTrue("Start time change didnt not work as expected for job #2 in "
+ + queueName + " queue",
+ jobs[0].getJobID().equals(fjob2.getJobID()));
+ assertTrue("Start time change didnt not work as expected for job #1 in"
+ + queueName + " queue",
+ jobs[1].getJobID().equals(fjob1.getJobID()));
+ // check if the queue is fine
+ assertEquals("Start-time change garbled the " + queueName + " queue",
+ 2, jobs.length);
+
+ // II. Change job priority change
+ // Bump up job1's priority and make sure job1 bumps up in the queue
+ taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
+
+ // Check if the priority changes are reflected
+ jobs = getJobsInQueue(waiting);
+ assertTrue("Priority change didnt not work as expected for job #1 in "
+ + queueName + " queue",
+ jobs[0].getJobID().equals(fjob1.getJobID()));
+ assertTrue("Priority change didnt not work as expected for job #2 in "
+ + queueName + " queue",
+ jobs[1].getJobID().equals(fjob2.getJobID()));
+
+ // check if the queue is fine
+ assertEquals("Priority change has garbled the " + queueName + " queue",
+ 2, jobs.length);
+
+ // reset the queue state back to normal
+ taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
+ taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
+ }
+
+ private JobInProgress[] getJobsInQueue(boolean waiting) {
+ Collection<JobInProgress> queue =
+ waiting
+ ? scheduler.jobQueuesManager.getWaitingJobQueue("default")
+ : scheduler.jobQueuesManager.getRunningJobQueue("default");
+ return queue.toArray(new JobInProgress[0]);
}
/*protected void submitJobs(int number, int state, int maps, int reduces)