You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/10/16 00:15:43 UTC
svn commit: r705075 - in /hadoop/core/branches/branch-0.19: ./
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapr...
Author: omalley
Date: Wed Oct 15 15:15:41 2008
New Revision: 705075
URL: http://svn.apache.org/viewvc?rev=705075&view=rev
Log:
HADOOP-4053. Schedulers must be notified when jobs complete. (Amar Kamat via omalley)
Added:
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java
- copied unchanged from r705073, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java
- copied unchanged from r705073, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
- copied unchanged from r705073, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Wed Oct 15 15:15:41 2008
@@ -873,6 +873,8 @@
HADOOP-4373. Fix calculation of Guaranteed Capacity for the
capacity-scheduler. (Hemanth Yamijala via acmurthy)
+ HADOOP-4053. Schedulers must be notified when jobs complete. (Amar Kamat via omalley)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Oct 15 15:15:41 2008
@@ -33,6 +33,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
import org.apache.hadoop.util.StringUtils;
@@ -680,7 +681,6 @@
// this job is a candidate for running. Initialize it, move it
// to run queue
j.initTasks();
- scheduler.jobQueuesManager.jobUpdated(j);
// We found a suitable job. Get task from it.
t = obtainNewTask(taskTracker, j);
if (t != null) {
@@ -714,7 +714,6 @@
scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
if (usersOverLimit.contains(j.getProfile().getUser())) {
j.initTasks();
- scheduler.jobQueuesManager.jobUpdated(j);
t = obtainNewTask(taskTracker, j);
if (t != null) {
LOG.debug("Getting task from job " +
@@ -1144,8 +1143,8 @@
reduceScheduler.jobAdded(job);
}
- // called when a job is removed
- synchronized void jobRemoved(JobInProgress job) {
+ // called when a job completes
+ synchronized void jobCompleted(JobInProgress job) {
// let our map and reduce schedulers know this, so they can update
// user-specific info
mapScheduler.jobRemoved(job);
Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Wed Oct 15 15:15:41 2008
@@ -27,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
/**
* A {@link JobInProgressListener} that maintains the jobs being managed in
@@ -173,25 +174,70 @@
scheduler.jobAdded(job);
}
- @Override
- public void jobRemoved(JobInProgress job) {
- QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
- if (null == qi) {
- // can't find queue for job. Shouldn't happen.
- LOG.warn("Could not find queue " + job.getProfile().getQueueName() +
- " when removing job " + job.getProfile().getJobID());
- return;
- }
+ private void jobCompleted(JobInProgress job, QueueInfo qi) {
+ LOG.info("Job " + job.getJobID().toString() + " submitted to queue "
+ + job.getProfile().getQueueName() + " has completed");
// job could be in running or waiting queue
if (!qi.runningJobs.remove(job)) {
QueueInfo.removeOb(qi.waitingJobs, job);
}
// let scheduler know
- scheduler.jobRemoved(job);
+ scheduler.jobCompleted(job);
+ }
+
+ // Note that job is removed when the job completes i.e in jobUpated()
+ @Override
+ public void jobRemoved(JobInProgress job) {}
+
+ // This is used to reposition a job in the queue. A job can get repositioned
+ // because of the change in the job priority or job start-time.
+ private void reorderJobs(JobInProgress job, QueueInfo qi) {
+ Collection<JobInProgress> queue = qi.waitingJobs;
+
+ // Remove from the waiting queue
+ if (!QueueInfo.removeOb(queue, job)) {
+ queue = qi.runningJobs;
+ QueueInfo.removeOb(queue, job);
+ }
+
+ // Add back to the queue
+ queue.add(job);
+ }
+
+ // This is used to move a job from the waiting queue to the running queue.
+ private void makeJobRunning(JobInProgress job, QueueInfo qi) {
+ // Remove from the waiting queue
+ QueueInfo.removeOb(qi.waitingJobs, job);
+
+ // Add the job to the running queue
+ qi.runningJobs.add(job);
+ }
+
+ // Update the scheduler as job's state has changed
+ private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
+ JobInProgress job = event.getJobInProgress();
+ // Check if the ordering of the job has changed
+ // For now priority and start-time can change the job ordering
+ if (event.getEventType() == EventType.PRIORITY_CHANGED
+ || event.getEventType() == EventType.START_TIME_CHANGED) {
+ // Make a priority change
+ reorderJobs(job, qi);
+ } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
+ // Check if the job is complete
+ int runState = job.getStatus().getRunState();
+ if (runState == JobStatus.SUCCEEDED
+ || runState == JobStatus.FAILED
+ || runState == JobStatus.KILLED) {
+ jobCompleted(job, qi);
+ } else if (runState == JobStatus.RUNNING) {
+ makeJobRunning(job, qi);
+ }
+ }
}
@Override
- public void jobUpdated(JobInProgress job) {
+ public void jobUpdated(JobChangeEvent event) {
+ JobInProgress job = event.getJobInProgress();
QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
if (null == qi) {
// can't find queue for job. Shouldn't happen.
@@ -199,18 +245,11 @@
" when updating job " + job.getProfile().getJobID());
return;
}
- // this is called when a job's priority or state is changed.
- // since we don't know the job's previous state, we need to
- // find out in which queue it was earlier, and then place it in the
- // right queue.
- Collection<JobInProgress> dest = (job.getStatus().getRunState() ==
- JobStatus.PREP)? qi.waitingJobs: qi.runningJobs;
- // We use our own version of removing objects based on referential
- // equality, since the 'job' object has already been changed.
- if (!QueueInfo.removeOb(qi.waitingJobs, job)) {
- qi.runningJobs.remove(job);
+
+ // Check if this is the status change
+ if (event instanceof JobStatusChangeEvent) {
+ jobStateChanged((JobStatusChangeEvent)event, qi);
}
- dest.add(job);
}
}
Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Oct 15 15:15:41 2008
@@ -33,6 +33,7 @@
import junit.framework.TestCase;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
//import org.apache.hadoop.mapred.CapacityTaskScheduler;
import org.apache.hadoop.conf.Configuration;
@@ -289,6 +290,32 @@
status.setRunState(TaskStatus.State.SUCCEEDED);
}
+ void finalizeJob(FakeJobInProgress fjob) {
+ // take a snapshot of the status before changing it
+ JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
+ fjob.getStatus().setRunState(JobStatus.SUCCEEDED);
+ JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus,
+ newStatus);
+ for (JobInProgressListener listener : listeners) {
+ listener.jobUpdated(event);
+ }
+ }
+
+ public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
+ // take a snapshot of the status before changing it
+ JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
+ fjob.setPriority(priority);
+ JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus,
+ newStatus);
+ for (JobInProgressListener listener : listeners) {
+ listener.jobUpdated(event);
+ }
+ }
+
void addQueues(String[] arr) {
Set<String> queues = new HashSet<String>();
for (String s: arr) {
@@ -415,6 +442,84 @@
return job;
}
+ // Submit a job and update the listeners
+ private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
+ String queue, String user)
+ throws IOException {
+ FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
+ scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+ return j;
+ }
+
+ // Note that there is no concept of setup tasks here. So init itself should
+ // report the job-status change
+ private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip)
+ throws IOException {
+ JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+ jip.initTasks();
+ JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+ return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
+ oldStatus, newStatus);
+ }
+
+ // test job run-state change
+ public void testJobRunStateChange() throws IOException {
+ // start the scheduler
+ taskTrackerManager.addQueues(new String[] {"default"});
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 1f, 1, true, 1));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit the job
+ FakeJobInProgress fjob1 =
+ submitJob(JobStatus.PREP, 1, 0, "default", "user");
+
+ FakeJobInProgress fjob2 =
+ submitJob(JobStatus.PREP, 1, 0, "default", "user");
+
+ // check if the job is in the waiting queue
+ assertTrue("Waiting queue doesnt contain queued job",
+ scheduler.jobQueuesManager.getWaitingJobQueue("default")
+ .contains(fjob1));
+
+ // change the job priority
+ taskTrackerManager.setPriority(fjob2, JobPriority.HIGH);
+
+ // Check if the priority changes are reflected
+ JobInProgress firstJob =
+ scheduler.getJobs("default").toArray(new JobInProgress[0])[0];
+ assertTrue("Priority change didnt not work as expected",
+ firstJob.getJobID().equals(fjob2.getJobID()));
+
+ // Create an event
+ JobChangeEvent event = initTasksAndReportEvent(fjob1);
+
+ // inform the scheduler
+ scheduler.jobQueuesManager.jobUpdated(event);
+
+ // check if the job is in the running queue
+ assertTrue("Running queue doesnt contain running/inited job",
+ scheduler.jobQueuesManager.getRunningJobQueue("default")
+ .contains(fjob1));
+
+ // schedule a task
+ List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+
+ // complete the job
+ taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(),
+ fjob1);
+
+ // mark the job as complete
+ taskTrackerManager.finalizeJob(fjob1);
+
+ // check if the job is removed from the scheduler
+ assertFalse("Scheduler contains completed job",
+ scheduler.getJobs("default").contains(fjob1));
+ }
+
/*protected void submitJobs(int number, int state, int maps, int reduces)
throws IOException {
for (int i = 0; i < number; i++) {
@@ -435,14 +540,8 @@
scheduler.start();
// submit 2 jobs
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
-
- // init them and inform the scheduler
- j1.initTasks();
- scheduler.jobQueuesManager.jobUpdated(j1);
- j2.initTasks();
- scheduler.jobQueuesManager.jobUpdated(j2);
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
// I. Check multiple assignments with running tasks within job
// ask for a task from first job
@@ -474,8 +573,7 @@
// IV. Check assignment with completed job
// finish first job
- j1.getStatus().setRunState(JobStatus.SUCCEEDED);
- scheduler.jobRemoved(j1);
+ scheduler.jobCompleted(j1);
// ask for another task from the second job
// if tasks can be assigned then the structures are properly updated
@@ -500,12 +598,13 @@
// submit a job with no queue specified. It should be accepted
// and given to the default queue.
- JobInProgress j = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+
// when we ask for a task, we should get one, from the job submitted
Task t;
t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// submit another job, to a different queue
- j = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// now when we get a task, it should be from the second job
t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
}
@@ -522,7 +621,7 @@
scheduler.start();
// submit a job
- JobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
+ JobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// submit another job
@@ -610,7 +709,7 @@
scheduler.start();
// submit a job
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the GC for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
@@ -637,12 +736,12 @@
scheduler.start();
// submit a job
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the GC for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// Submit another job, from a different user
- submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a map task, it should come from the second job
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
// Now we're at full capacity for maps. If I ask for another map task,
@@ -666,14 +765,14 @@
scheduler.start();
// submit a job
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the GC for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// since we're the only job, we get another map
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
// Submit another job, from a different user
- submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a map task, it should come from the second job
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
// and another
@@ -695,7 +794,7 @@
scheduler.start();
// submit a job
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the GC for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
@@ -705,7 +804,7 @@
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
// Submit another job, from a different user
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// one of the task finishes
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
// Now if I ask for a map task, it should come from the second job
@@ -738,7 +837,7 @@
taskTrackerManager.addTaskTracker("tt5");
// u1 submits job
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// it gets the first 5 slots
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
@@ -746,7 +845,7 @@
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
// u2 submits job with 4 slots
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 4, 4, null, "u2");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
// u2 should get next 4 slots
checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
@@ -793,8 +892,8 @@
// set up a situation where q2 is under capacity, and default & q3
// are at/over capacity
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
@@ -841,11 +940,11 @@
taskTrackerManager.addTaskTracker("tt5");
// q2 has nothing running, default is under cap, q3 and q4 are over cap
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 2, 2, null, "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 2, 2, null, "u1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q4", "u1");
+ FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q4", "u1");
checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
@@ -893,13 +992,13 @@
// set up a situation where q2 is under capacity, and default is
// at/over capacity
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
// now submit a job to q2
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 1, 1, "q2", "u1");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
// update our structures
scheduler.updateQSIInfo();
// get scheduler to notice that q2 needs to reclaim
@@ -908,7 +1007,7 @@
// we start reclaiming when 15 secs are left.
clock.advance(400000);
// submit another job to q2 which causes more capacity to be reclaimed
- j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// update our structures
scheduler.updateQSIInfo();
clock.advance(200000);
Modified: hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Oct 15 15:15:41 2008
@@ -190,7 +190,7 @@
}
@Override
- public void jobUpdated(JobInProgress job) {
+ public void jobUpdated(JobChangeEvent event) {
}
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Wed Oct 15 15:15:41 2008
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.util.StringUtils;
/**
@@ -132,9 +133,20 @@
}
@Override
- public void jobUpdated(JobInProgress job) {
- synchronized (jobInitQueue) {
- resortInitQueue();
+ public void jobUpdated(JobChangeEvent event) {
+ if (event instanceof JobStatusChangeEvent) {
+ jobStateChanged((JobStatusChangeEvent)event);
+ }
+ }
+
+ // called when the job's status is changed
+ private void jobStateChanged(JobStatusChangeEvent event) {
+ // Resort the job queue if the job-start-time or job-priority changes
+ if (event.getEventType() == EventType.START_TIME_CHANGED
+ || event.getEventType() == EventType.PRIORITY_CHANGED) {
+ synchronized (jobInitQueue) {
+ resortInitQueue();
+ }
}
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 15 15:15:41 2008
@@ -2385,4 +2385,11 @@
boolean initStarted;
boolean initDone;
}
+
+ boolean isComplete() {
+ int runState = this.status.getRunState();
+ return runState == JobStatus.SUCCEEDED
+ || runState == JobStatus.FAILED
+ || runState == JobStatus.KILLED;
+ }
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java Wed Oct 15 15:15:41 2008
@@ -37,7 +37,8 @@
/**
* Invoked when a job has been updated in the {@link JobTracker}.
- * @param job The updated job.
+ * This change in the job is tracker using {@link JobChangeEvent}.
+ * @param event the event that tracks the change
*/
- public abstract void jobUpdated(JobInProgress job);
+ public abstract void jobUpdated(JobChangeEvent event);
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java Wed Oct 15 15:15:41 2008
@@ -22,6 +22,8 @@
import java.util.Comparator;
import java.util.TreeSet;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+
/**
* A {@link JobInProgressListener} that maintains the jobs being managed in
* a queue. By default the queue is FIFO, but it is possible to use custom
@@ -74,13 +76,38 @@
jobQueue.add(job);
}
+ // Job will be removed once the job completes
@Override
- public void jobRemoved(JobInProgress job) {
+ public void jobRemoved(JobInProgress job) {}
+
+ private void jobCompleted(JobInProgress job) {
jobQueue.remove(job);
}
@Override
- public synchronized void jobUpdated(JobInProgress job) {
+ public synchronized void jobUpdated(JobChangeEvent event) {
+ JobInProgress job = event.getJobInProgress();
+ if (event instanceof JobStatusChangeEvent) {
+ // Check if the ordering of the job has changed
+ // For now priority and start-time can change the job ordering
+ JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
+ if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED
+ || statusEvent.getEventType() == EventType.START_TIME_CHANGED) {
+ // Make a priority change
+ reorderJobs(job);
+ } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
+ // Check if the job is complete
+ int runState = statusEvent.getNewStatus().getRunState();
+ if (runState == JobStatus.SUCCEEDED
+ || runState == JobStatus.FAILED
+ || runState == JobStatus.KILLED) {
+ jobCompleted(job);
+ }
+ }
+ }
+ }
+
+ private void reorderJobs(JobInProgress job) {
synchronized (jobQueue) {
jobQueue.remove(job);
jobQueue.add(job);
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java Wed Oct 15 15:15:41 2008
@@ -32,7 +32,7 @@
* not intended to be a comprehensive piece of data.
* For that, look at JobProfile.
**************************************************/
-public class JobStatus implements Writable {
+public class JobStatus implements Writable, Cloneable {
static { // register a ctor
WritableFactories.setFactory
@@ -216,6 +216,16 @@
*/
synchronized public long getStartTime() { return startTime;}
+ @Override
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen since we do implement Clonable
+ throw new InternalError(cnse.toString());
+ }
+ }
+
/**
* @param user The username of the job
*/
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Oct 15 15:15:41 2008
@@ -57,6 +57,7 @@
import org.apache.hadoop.mapred.JobHistory.Keys;
import org.apache.hadoop.mapred.JobHistory.Listener;
import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -534,25 +535,16 @@
hasUpdates = true;
LOG.info("Calling init from RM for job " + jip.getJobID().toString());
jip.initTasks();
- updateJobListeners();
- }
- }
-
- private void updateJobListeners() {
- // The scheduler needs to be informed as the recovery-manager
- // has inited the jobs
- for (JobInProgressListener listener : jobInProgressListeners) {
- listener.jobUpdated(jip);
}
}
void close() {
if (hasUpdates) {
// Apply the final (job-level) updates
- updateJob(jip, job);
- // Update the job listeners as the start/submit time and the job
- // priority has changed
- updateJobListeners();
+ JobStatusChangeEvent event = updateJob(jip, job);
+
+ // Update the job listeners
+ updateJobInProgressListeners(event);
}
}
@@ -612,20 +604,29 @@
}
}
- private void updateJob(JobInProgress jip, JobHistory.JobInfo job) {
+ private JobStatusChangeEvent updateJob(JobInProgress jip,
+ JobHistory.JobInfo job) {
+ // Change the job priority
+ String jobpriority = job.get(Keys.JOB_PRIORITY);
+ JobPriority priority = JobPriority.valueOf(jobpriority);
+ // It's important to update this via the jobtracker's api as it will
+ // take care of updating the event listeners too
+ setJobPriority(jip.getJobID(), priority);
+
+ // Save the previous job status
+ JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+
// Set the start/launch time only if there are recovered tasks
// Increment the job's restart count
jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME),
job.getLong(JobHistory.Keys.LAUNCH_TIME),
job.getInt(Keys.RESTART_COUNT) + 1);
+
+ // Save the new job status
+ JobStatus newStatus = (JobStatus)jip.getStatus().clone();
- // Change the job priority
- String jobpriority = job.get(Keys.JOB_PRIORITY);
- if (jobpriority.length() > 0) {
- JobPriority priority = JobPriority.valueOf(jobpriority);
- // Its important to update this via the jobtracker's api
- setJobPriority(jip.getJobID(), priority);
- }
+ return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus,
+ newStatus);
}
private void updateTip(TaskInProgress tip, JobHistory.Task task) {
@@ -1761,6 +1762,13 @@
jobInProgressListeners.remove(listener);
}
+ // Update the listeners about the job
+ private void updateJobInProgressListeners(JobChangeEvent event) {
+ for (JobInProgressListener listener : jobInProgressListeners) {
+ listener.jobUpdated(event);
+ }
+ }
+
/**
* Return the {@link QueueManager} associated with the JobTracker.
*/
@@ -2263,8 +2271,24 @@
public synchronized void killJob(JobID jobid) throws IOException {
JobInProgress job = jobs.get(jobid);
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
job.kill();
+
+ // Inform the listeners if the job is killed
+ // Note :
+ // If the job is killed in the PREP state then the listeners will be
+ // invoked
+ // If the job is killed in the RUNNING state then cleanup tasks will be
+ // launched and the updateTaskStatuses() will take care of it
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+ if (prevStatus.getRunState() != newStatus.getRunState()
+ && newStatus.getRunState() == JobStatus.KILLED) {
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
+ newStatus);
+ updateJobInProgressListeners(event);
+ }
}
/**
@@ -2530,10 +2554,13 @@
JobInProgress job = jobs.get(jobId);
if (job != null) {
synchronized (taskScheduler) {
+ JobStatus oldStatus = (JobStatus)job.getStatus().clone();
job.setPriority(priority);
- for (JobInProgressListener listener : jobInProgressListeners) {
- listener.jobUpdated(job);
- }
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.PRIORITY_CHANGED, oldStatus,
+ newStatus);
+ updateJobInProgressListeners(event);
}
} else {
LOG.warn("Trying to change the priority of an unknown job: " + jobId);
@@ -2559,13 +2586,25 @@
// Check if the tip is known to the jobtracker. In case of a restarted
// jt, some tasks might join in later
if (tip != null || hasRestarted()) {
+ JobInProgress job = getJob(taskId.getJobID());
if (tip == null) {
- JobInProgress job = getJob(taskId.getJobID());
tip = job.getTaskInProgress(taskId.getTaskID());
job.addRunningTaskToTIP(tip, taskId, status, false);
}
expireLaunchingTasks.removeTask(taskId);
- tip.getJob().updateTaskStatus(tip, report, myInstrumentation);
+
+ // Update the job and inform the listeners if necessary
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+ job.updateTaskStatus(tip, report, myInstrumentation);
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+
+ // Update the listeners if an incomplete job completes
+ if (prevStatus.getRunState() != newStatus.getRunState()) {
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED,
+ prevStatus, newStatus);
+ updateJobInProgressListeners(event);
+ }
} else {
LOG.info("Serious problem. While updating status, cannot find taskid "
+ report.getTaskID());
Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java Wed Oct 15 15:15:41 2008
@@ -40,7 +40,7 @@
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
- private void runJobFail(JobConf conf) throws IOException {
+ static JobID runJobFail(JobConf conf) throws IOException {
conf.setJobName("testjobfail");
conf.setMapperClass(FailMapper.class);
@@ -55,9 +55,11 @@
}
// Checking that the Job got failed
assertEquals(job.getJobState(), JobStatus.FAILED);
+
+ return job.getID();
}
- private void runJobKill(JobConf conf) throws IOException {
+ static JobID runJobKill(JobConf conf) throws IOException {
conf.setJobName("testjobkill");
conf.setMapperClass(KillMapper.class);
@@ -81,9 +83,11 @@
// Checking that the Job got killed
assertTrue(job.isComplete());
assertEquals(job.getJobState(), JobStatus.KILLED);
+
+ return job.getID();
}
- private RunningJob runJob(JobConf conf) throws IOException {
+ static RunningJob runJob(JobConf conf) throws IOException {
final Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
final Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java Wed Oct 15 15:15:41 2008
@@ -29,6 +29,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -42,6 +44,21 @@
private MiniDFSCluster dfsCluster;
private JobConf jc;
private static final String JOB_SCHEDULING_INFO = "TESTSCHEDULINGINFO";
+ private static final Path TEST_DIR = new Path("job-queue-info-testing");
+
+ // configure a waiting job with 2 maps
+ private JobConf configureWaitingJob(JobConf conf) throws IOException {
+ Path inDir = new Path(TEST_DIR, "input");
+ Path shareDir = new Path(TEST_DIR, "share");
+ Path outputDir = new Path(TEST_DIR, "output");
+ String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
+ String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
+ JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL};
+ return TestJobTrackerRestart.getJobs(conf, priority,
+ new int[] {2}, new int[] {0},
+ outputDir, inDir,
+ mapSignalFile, redSignalFile)[0];
+ }
public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler {
@@ -89,10 +106,21 @@
assertEquals(1, queueInfos.length);
assertEquals("default", queueInfos[0].getQueueName());
JobConf conf = mrCluster.createJobConf();
- SleepJob sleepJob = new SleepJob();
- sleepJob.setConf(conf);
- conf = sleepJob.setupJobConf(4, 2, 1, 1, 1, 1);
- JobClient.runJob(conf);
+ FileSystem fileSys = dfsCluster.getFileSystem();
+
+ // configure a waiting job
+ conf = configureWaitingJob(conf);
+ conf.setJobName("test-job-queue-info-test");
+
+ // clear the signal file if any
+ TestJobTrackerRestart.cleanUp(fileSys, TEST_DIR);
+
+ RunningJob rJob = jc.submitJob(conf);
+
+ while (rJob.getJobState() != JobStatus.RUNNING) {
+ TestJobTrackerRestart.waitFor(10);
+ }
+
int numberOfJobs = 0;
for (JobQueueInfo queueInfo : queueInfos) {