You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/10/16 00:12:20 UTC
svn commit: r705073 - in /hadoop/core/trunk: ./
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/mapred/...
Author: omalley
Date: Wed Oct 15 15:12:19 2008
New Revision: 705073
URL: http://svn.apache.org/viewvc?rev=705073&view=rev
Log:
HADOOP-4053. Schedulers must be notified when jobs complete. (Amar Kamat via omalley)
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Oct 15 15:12:19 2008
@@ -927,6 +927,8 @@
HADOOP-4373. Fix calculation of Guaranteed Capacity for the
capacity-scheduler. (Hemanth Yamijala via acmurthy)
+ HADOOP-4053. Schedulers must be notified when jobs complete. (Amar Kamat via omalley)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Oct 15 15:12:19 2008
@@ -33,6 +33,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
import org.apache.hadoop.util.StringUtils;
@@ -681,7 +682,6 @@
// this job is a candidate for running. Initialize it, move it
// to run queue
j.initTasks();
- scheduler.jobQueuesManager.jobUpdated(j);
// We found a suitable job. Get task from it.
t = obtainNewTask(taskTracker, j);
if (t != null) {
@@ -715,7 +715,6 @@
scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
if (usersOverLimit.contains(j.getProfile().getUser())) {
j.initTasks();
- scheduler.jobQueuesManager.jobUpdated(j);
t = obtainNewTask(taskTracker, j);
if (t != null) {
LOG.debug("Getting task from job " +
@@ -1161,8 +1160,8 @@
reduceScheduler.jobAdded(job);
}
- // called when a job is removed
- synchronized void jobRemoved(JobInProgress job) {
+ // called when a job completes
+ synchronized void jobCompleted(JobInProgress job) {
// let our map and reduce schedulers know this, so they can update
// user-specific info
mapScheduler.jobRemoved(job);
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Wed Oct 15 15:12:19 2008
@@ -27,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
/**
* A {@link JobInProgressListener} that maintains the jobs being managed in
@@ -173,25 +174,70 @@
scheduler.jobAdded(job);
}
- @Override
- public void jobRemoved(JobInProgress job) {
- QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
- if (null == qi) {
- // can't find queue for job. Shouldn't happen.
- LOG.warn("Could not find queue " + job.getProfile().getQueueName() +
- " when removing job " + job.getProfile().getJobID());
- return;
- }
+ private void jobCompleted(JobInProgress job, QueueInfo qi) {
+ LOG.info("Job " + job.getJobID().toString() + " submitted to queue "
+ + job.getProfile().getQueueName() + " has completed");
// job could be in running or waiting queue
if (!qi.runningJobs.remove(job)) {
QueueInfo.removeOb(qi.waitingJobs, job);
}
// let scheduler know
- scheduler.jobRemoved(job);
+ scheduler.jobCompleted(job);
+ }
+
+ // Note that job is removed when the job completes i.e in jobUpated()
+ @Override
+ public void jobRemoved(JobInProgress job) {}
+
+ // This is used to reposition a job in the queue. A job can get repositioned
+ // because of the change in the job priority or job start-time.
+ private void reorderJobs(JobInProgress job, QueueInfo qi) {
+ Collection<JobInProgress> queue = qi.waitingJobs;
+
+ // Remove from the waiting queue
+ if (!QueueInfo.removeOb(queue, job)) {
+ queue = qi.runningJobs;
+ QueueInfo.removeOb(queue, job);
+ }
+
+ // Add back to the queue
+ queue.add(job);
+ }
+
+ // This is used to move a job from the waiting queue to the running queue.
+ private void makeJobRunning(JobInProgress job, QueueInfo qi) {
+ // Remove from the waiting queue
+ QueueInfo.removeOb(qi.waitingJobs, job);
+
+ // Add the job to the running queue
+ qi.runningJobs.add(job);
+ }
+
+ // Update the scheduler as job's state has changed
+ private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
+ JobInProgress job = event.getJobInProgress();
+ // Check if the ordering of the job has changed
+ // For now priority and start-time can change the job ordering
+ if (event.getEventType() == EventType.PRIORITY_CHANGED
+ || event.getEventType() == EventType.START_TIME_CHANGED) {
+ // Make a priority change
+ reorderJobs(job, qi);
+ } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
+ // Check if the job is complete
+ int runState = job.getStatus().getRunState();
+ if (runState == JobStatus.SUCCEEDED
+ || runState == JobStatus.FAILED
+ || runState == JobStatus.KILLED) {
+ jobCompleted(job, qi);
+ } else if (runState == JobStatus.RUNNING) {
+ makeJobRunning(job, qi);
+ }
+ }
}
@Override
- public void jobUpdated(JobInProgress job) {
+ public void jobUpdated(JobChangeEvent event) {
+ JobInProgress job = event.getJobInProgress();
QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
if (null == qi) {
// can't find queue for job. Shouldn't happen.
@@ -199,18 +245,11 @@
" when updating job " + job.getProfile().getJobID());
return;
}
- // this is called when a job's priority or state is changed.
- // since we don't know the job's previous state, we need to
- // find out in which queue it was earlier, and then place it in the
- // right queue.
- Collection<JobInProgress> dest = (job.getStatus().getRunState() ==
- JobStatus.PREP)? qi.waitingJobs: qi.runningJobs;
- // We use our own version of removing objects based on referential
- // equality, since the 'job' object has already been changed.
- if (!QueueInfo.removeOb(qi.waitingJobs, job)) {
- qi.runningJobs.remove(job);
+
+ // Check if this is the status change
+ if (event instanceof JobStatusChangeEvent) {
+ jobStateChanged((JobStatusChangeEvent)event, qi);
}
- dest.add(job);
}
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Oct 15 15:12:19 2008
@@ -33,6 +33,7 @@
import junit.framework.TestCase;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
//import org.apache.hadoop.mapred.CapacityTaskScheduler;
import org.apache.hadoop.conf.Configuration;
@@ -289,6 +290,32 @@
status.setRunState(TaskStatus.State.SUCCEEDED);
}
+ void finalizeJob(FakeJobInProgress fjob) {
+ // take a snapshot of the status before changing it
+ JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
+ fjob.getStatus().setRunState(JobStatus.SUCCEEDED);
+ JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus,
+ newStatus);
+ for (JobInProgressListener listener : listeners) {
+ listener.jobUpdated(event);
+ }
+ }
+
+ public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
+ // take a snapshot of the status before changing it
+ JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
+ fjob.setPriority(priority);
+ JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus,
+ newStatus);
+ for (JobInProgressListener listener : listeners) {
+ listener.jobUpdated(event);
+ }
+ }
+
void addQueues(String[] arr) {
Set<String> queues = new HashSet<String>();
for (String s: arr) {
@@ -418,6 +445,84 @@
return job;
}
+ // Submit a job and update the listeners
+ private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
+ String queue, String user)
+ throws IOException {
+ FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
+ scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+ return j;
+ }
+
+ // Note that there is no concept of setup tasks here. So init itself should
+ // report the job-status change
+ private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip)
+ throws IOException {
+ JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+ jip.initTasks();
+ JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+ return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
+ oldStatus, newStatus);
+ }
+
+ // test job run-state change
+ public void testJobRunStateChange() throws IOException {
+ // start the scheduler
+ taskTrackerManager.addQueues(new String[] {"default"});
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 1f, 1, true, 1));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit the job
+ FakeJobInProgress fjob1 =
+ submitJob(JobStatus.PREP, 1, 0, "default", "user");
+
+ FakeJobInProgress fjob2 =
+ submitJob(JobStatus.PREP, 1, 0, "default", "user");
+
+ // check if the job is in the waiting queue
+ assertTrue("Waiting queue doesnt contain queued job",
+ scheduler.jobQueuesManager.getWaitingJobQueue("default")
+ .contains(fjob1));
+
+ // change the job priority
+ taskTrackerManager.setPriority(fjob2, JobPriority.HIGH);
+
+ // Check if the priority changes are reflected
+ JobInProgress firstJob =
+ scheduler.getJobs("default").toArray(new JobInProgress[0])[0];
+ assertTrue("Priority change didnt not work as expected",
+ firstJob.getJobID().equals(fjob2.getJobID()));
+
+ // Create an event
+ JobChangeEvent event = initTasksAndReportEvent(fjob1);
+
+ // inform the scheduler
+ scheduler.jobQueuesManager.jobUpdated(event);
+
+ // check if the job is in the running queue
+ assertTrue("Running queue doesnt contain running/inited job",
+ scheduler.jobQueuesManager.getRunningJobQueue("default")
+ .contains(fjob1));
+
+ // schedule a task
+ List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+
+ // complete the job
+ taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(),
+ fjob1);
+
+ // mark the job as complete
+ taskTrackerManager.finalizeJob(fjob1);
+
+ // check if the job is removed from the scheduler
+ assertFalse("Scheduler contains completed job",
+ scheduler.getJobs("default").contains(fjob1));
+ }
+
/*protected void submitJobs(int number, int state, int maps, int reduces)
throws IOException {
for (int i = 0; i < number; i++) {
@@ -438,14 +543,8 @@
scheduler.start();
// submit 2 jobs
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
-
- // init them and inform the scheduler
- j1.initTasks();
- scheduler.jobQueuesManager.jobUpdated(j1);
- j2.initTasks();
- scheduler.jobQueuesManager.jobUpdated(j2);
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
// I. Check multiple assignments with running tasks within job
// ask for a task from first job
@@ -477,8 +576,7 @@
// IV. Check assignment with completed job
// finish first job
- j1.getStatus().setRunState(JobStatus.SUCCEEDED);
- scheduler.jobRemoved(j1);
+ scheduler.jobCompleted(j1);
// ask for another task from the second job
// if tasks can be assigned then the structures are properly updated
@@ -503,12 +601,13 @@
// submit a job with no queue specified. It should be accepted
// and given to the default queue.
- JobInProgress j = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+
// when we ask for a task, we should get one, from the job submitted
Task t;
t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// submit another job, to a different queue
- j = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// now when we get a task, it should be from the second job
t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
}
@@ -525,7 +624,7 @@
scheduler.start();
// submit a job
- JobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
+ JobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// submit another job
@@ -635,7 +734,7 @@
scheduler.start();
// submit a job
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the GC for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
@@ -662,12 +761,12 @@
scheduler.start();
// submit a job
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the GC for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// Submit another job, from a different user
- submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a map task, it should come from the second job
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
// Now we're at full capacity for maps. If I ask for another map task,
@@ -691,14 +790,14 @@
scheduler.start();
// submit a job
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the GC for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// since we're the only job, we get another map
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
// Submit another job, from a different user
- submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a map task, it should come from the second job
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
// and another
@@ -720,7 +819,7 @@
scheduler.start();
// submit a job
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the GC for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
@@ -730,7 +829,7 @@
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
// Submit another job, from a different user
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// one of the task finishes
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
// Now if I ask for a map task, it should come from the second job
@@ -763,7 +862,7 @@
taskTrackerManager.addTaskTracker("tt5");
// u1 submits job
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// it gets the first 5 slots
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
@@ -771,7 +870,7 @@
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
// u2 submits job with 4 slots
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 4, 4, null, "u2");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
// u2 should get next 4 slots
checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
@@ -818,8 +917,8 @@
// set up a situation where q2 is under capacity, and default & q3
// are at/over capacity
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
@@ -866,11 +965,11 @@
taskTrackerManager.addTaskTracker("tt5");
// q2 has nothing running, default is under cap, q3 and q4 are over cap
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 2, 2, null, "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 2, 2, null, "u1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q4", "u1");
+ FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q4", "u1");
checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
@@ -918,13 +1017,13 @@
// set up a situation where q2 is under capacity, and default is
// at/over capacity
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
// now submit a job to q2
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 1, 1, "q2", "u1");
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
// update our structures
scheduler.updateQSIInfo();
// get scheduler to notice that q2 needs to reclaim
@@ -933,7 +1032,7 @@
// we start reclaiming when 15 secs are left.
clock.advance(400000);
// submit another job to q2 which causes more capacity to be reclaimed
- j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// update our structures
scheduler.updateQSIInfo();
clock.advance(200000);
Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Oct 15 15:12:19 2008
@@ -190,7 +190,7 @@
}
@Override
- public void jobUpdated(JobInProgress job) {
+ public void jobUpdated(JobChangeEvent event) {
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Wed Oct 15 15:12:19 2008
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.util.StringUtils;
/**
@@ -132,9 +133,20 @@
}
@Override
- public void jobUpdated(JobInProgress job) {
- synchronized (jobInitQueue) {
- resortInitQueue();
+ public void jobUpdated(JobChangeEvent event) {
+ if (event instanceof JobStatusChangeEvent) {
+ jobStateChanged((JobStatusChangeEvent)event);
+ }
+ }
+
+ // called when the job's status is changed
+ private void jobStateChanged(JobStatusChangeEvent event) {
+ // Resort the job queue if the job-start-time or job-priority changes
+ if (event.getEventType() == EventType.START_TIME_CHANGED
+ || event.getEventType() == EventType.PRIORITY_CHANGED) {
+ synchronized (jobInitQueue) {
+ resortInitQueue();
+ }
}
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java?rev=705073&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java Wed Oct 15 15:12:19 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * {@link JobChangeEvent} is used to capture state changes in a job. A job can
+ * change its state w.r.t priority, progress, run-state etc.
+ */
+abstract class JobChangeEvent {
+ private JobInProgress jip;
+
+ JobChangeEvent(JobInProgress jip) {
+ this.jip = jip;
+ }
+
+ /**
+ * Get the job object for which the change is reported
+ */
+ JobInProgress getJobInProgress() {
+ return jip;
+ }
+}
\ No newline at end of file
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 15 15:12:19 2008
@@ -2385,4 +2385,11 @@
boolean initStarted;
boolean initDone;
}
+
+ boolean isComplete() {
+ int runState = this.status.getRunState();
+ return runState == JobStatus.SUCCEEDED
+ || runState == JobStatus.FAILED
+ || runState == JobStatus.KILLED;
+ }
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java Wed Oct 15 15:12:19 2008
@@ -37,7 +37,8 @@
/**
* Invoked when a job has been updated in the {@link JobTracker}.
- * @param job The updated job.
+ * This change in the job is tracker using {@link JobChangeEvent}.
+ * @param event the event that tracks the change
*/
- public abstract void jobUpdated(JobInProgress job);
+ public abstract void jobUpdated(JobChangeEvent event);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java Wed Oct 15 15:12:19 2008
@@ -22,6 +22,8 @@
import java.util.Comparator;
import java.util.TreeSet;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+
/**
* A {@link JobInProgressListener} that maintains the jobs being managed in
* a queue. By default the queue is FIFO, but it is possible to use custom
@@ -74,13 +76,38 @@
jobQueue.add(job);
}
+ // Job will be removed once the job completes
@Override
- public void jobRemoved(JobInProgress job) {
+ public void jobRemoved(JobInProgress job) {}
+
+ private void jobCompleted(JobInProgress job) {
jobQueue.remove(job);
}
@Override
- public synchronized void jobUpdated(JobInProgress job) {
+ public synchronized void jobUpdated(JobChangeEvent event) {
+ JobInProgress job = event.getJobInProgress();
+ if (event instanceof JobStatusChangeEvent) {
+ // Check if the ordering of the job has changed
+ // For now priority and start-time can change the job ordering
+ JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
+ if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED
+ || statusEvent.getEventType() == EventType.START_TIME_CHANGED) {
+ // Make a priority change
+ reorderJobs(job);
+ } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
+ // Check if the job is complete
+ int runState = statusEvent.getNewStatus().getRunState();
+ if (runState == JobStatus.SUCCEEDED
+ || runState == JobStatus.FAILED
+ || runState == JobStatus.KILLED) {
+ jobCompleted(job);
+ }
+ }
+ }
+ }
+
+ private void reorderJobs(JobInProgress job) {
synchronized (jobQueue) {
jobQueue.remove(job);
jobQueue.add(job);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatus.java Wed Oct 15 15:12:19 2008
@@ -32,7 +32,7 @@
* not intended to be a comprehensive piece of data.
* For that, look at JobProfile.
**************************************************/
-public class JobStatus implements Writable {
+public class JobStatus implements Writable, Cloneable {
static { // register a ctor
WritableFactories.setFactory
@@ -216,6 +216,16 @@
*/
synchronized public long getStartTime() { return startTime;}
+ @Override
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen since we do implement Clonable
+ throw new InternalError(cnse.toString());
+ }
+ }
+
/**
* @param user The username of the job
*/
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java?rev=705073&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java Wed Oct 15 15:12:19 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * {@link JobStatusChangeEvent} tracks the change in job's status. Job's
+ * status can change w.r.t
+ * - run state i.e PREP, RUNNING, FAILED, KILLED, SUCCEEDED
+ * - start time
+ * - priority
+ * Note that job times can change as the job can get restarted.
+ */
+class JobStatusChangeEvent extends JobChangeEvent {
+ // Events in job status that can lead to a job-status change
+ static enum EventType {RUN_STATE_CHANGED, START_TIME_CHANGED, PRIORITY_CHANGED}
+
+ private JobStatus oldStatus;
+ private JobStatus newStatus;
+ private EventType eventType;
+
+ JobStatusChangeEvent(JobInProgress jip, EventType eventType,
+ JobStatus oldStatus, JobStatus newStatus) {
+ super(jip);
+ this.oldStatus = oldStatus;
+ this.newStatus = newStatus;
+ this.eventType = eventType;
+ }
+
+ /**
+ * Create a {@link JobStatusChangeEvent} indicating the state has changed.
+ * Note that here we assume that the state change doesnt care about the old
+ * state.
+ */
+ JobStatusChangeEvent(JobInProgress jip, EventType eventType, JobStatus status)
+ {
+ this(jip, eventType, status, status);
+ }
+
+ /**
+ * Returns a event-type that caused the state change
+ */
+ EventType getEventType() {
+ return eventType;
+ }
+
+ /**
+ * Get the old job status
+ */
+ JobStatus getOldStatus() {
+ return oldStatus;
+ }
+
+ /**
+ * Get the new job status as a result of the events
+ */
+ JobStatus getNewStatus() {
+ return newStatus;
+ }
+}
\ No newline at end of file
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Oct 15 15:12:19 2008
@@ -57,6 +57,7 @@
import org.apache.hadoop.mapred.JobHistory.Keys;
import org.apache.hadoop.mapred.JobHistory.Listener;
import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -534,25 +535,16 @@
hasUpdates = true;
LOG.info("Calling init from RM for job " + jip.getJobID().toString());
jip.initTasks();
- updateJobListeners();
- }
- }
-
- private void updateJobListeners() {
- // The scheduler needs to be informed as the recovery-manager
- // has inited the jobs
- for (JobInProgressListener listener : jobInProgressListeners) {
- listener.jobUpdated(jip);
}
}
void close() {
if (hasUpdates) {
// Apply the final (job-level) updates
- updateJob(jip, job);
- // Update the job listeners as the start/submit time and the job
- // priority has changed
- updateJobListeners();
+ JobStatusChangeEvent event = updateJob(jip, job);
+
+ // Update the job listeners
+ updateJobInProgressListeners(event);
}
}
@@ -612,20 +604,29 @@
}
}
- private void updateJob(JobInProgress jip, JobHistory.JobInfo job) {
+ private JobStatusChangeEvent updateJob(JobInProgress jip,
+ JobHistory.JobInfo job) {
+ // Change the job priority
+ String jobpriority = job.get(Keys.JOB_PRIORITY);
+ JobPriority priority = JobPriority.valueOf(jobpriority);
+ // It's important to update this via the jobtracker's api as it will
+ // take care of updating the event listeners too
+ setJobPriority(jip.getJobID(), priority);
+
+ // Save the previous job status
+ JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+
// Set the start/launch time only if there are recovered tasks
// Increment the job's restart count
jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME),
job.getLong(JobHistory.Keys.LAUNCH_TIME),
job.getInt(Keys.RESTART_COUNT) + 1);
+
+ // Save the new job status
+ JobStatus newStatus = (JobStatus)jip.getStatus().clone();
- // Change the job priority
- String jobpriority = job.get(Keys.JOB_PRIORITY);
- if (jobpriority.length() > 0) {
- JobPriority priority = JobPriority.valueOf(jobpriority);
- // Its important to update this via the jobtracker's api
- setJobPriority(jip.getJobID(), priority);
- }
+ return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus,
+ newStatus);
}
private void updateTip(TaskInProgress tip, JobHistory.Task task) {
@@ -1761,6 +1762,13 @@
jobInProgressListeners.remove(listener);
}
+ // Update the listeners about the job
+ private void updateJobInProgressListeners(JobChangeEvent event) {
+ for (JobInProgressListener listener : jobInProgressListeners) {
+ listener.jobUpdated(event);
+ }
+ }
+
/**
* Return the {@link QueueManager} associated with the JobTracker.
*/
@@ -2263,8 +2271,24 @@
public synchronized void killJob(JobID jobid) throws IOException {
JobInProgress job = jobs.get(jobid);
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
job.kill();
+
+ // Inform the listeners if the job is killed
+ // Note :
+ // If the job is killed in the PREP state then the listeners will be
+ // invoked
+ // If the job is killed in the RUNNING state then cleanup tasks will be
+ // launched and the updateTaskStatuses() will take care of it
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+ if (prevStatus.getRunState() != newStatus.getRunState()
+ && newStatus.getRunState() == JobStatus.KILLED) {
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
+ newStatus);
+ updateJobInProgressListeners(event);
+ }
}
/**
@@ -2530,10 +2554,13 @@
JobInProgress job = jobs.get(jobId);
if (job != null) {
synchronized (taskScheduler) {
+ JobStatus oldStatus = (JobStatus)job.getStatus().clone();
job.setPriority(priority);
- for (JobInProgressListener listener : jobInProgressListeners) {
- listener.jobUpdated(job);
- }
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.PRIORITY_CHANGED, oldStatus,
+ newStatus);
+ updateJobInProgressListeners(event);
}
} else {
LOG.warn("Trying to change the priority of an unknown job: " + jobId);
@@ -2559,13 +2586,25 @@
// Check if the tip is known to the jobtracker. In case of a restarted
// jt, some tasks might join in later
if (tip != null || hasRestarted()) {
+ JobInProgress job = getJob(taskId.getJobID());
if (tip == null) {
- JobInProgress job = getJob(taskId.getJobID());
tip = job.getTaskInProgress(taskId.getTaskID());
job.addRunningTaskToTIP(tip, taskId, status, false);
}
expireLaunchingTasks.removeTask(taskId);
- tip.getJob().updateTaskStatus(tip, report, myInstrumentation);
+
+ // Update the job and inform the listeners if necessary
+ JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+ job.updateTaskStatus(tip, report, myInstrumentation);
+ JobStatus newStatus = (JobStatus)job.getStatus().clone();
+
+ // Update the listeners if an incomplete job completes
+ if (prevStatus.getRunState() != newStatus.getRunState()) {
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED,
+ prevStatus, newStatus);
+ updateJobInProgressListeners(event);
+ }
} else {
LOG.info("Serious problem. While updating status, cannot find taskid "
+ report.getTaskID());
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=705073&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java Wed Oct 15 15:12:19 2008
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.framework.TestCase;
+
+/**
+ * Test whether the JobInProgressListeners are informed as expected.
+ */
+public class TestJobInProgressListener extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestJobInProgressListener.class);
+
+ // A listener that inits the tasks one at a time and also listens to the
+ // events
+ public static class MyListener extends JobInProgressListener {
+ private List<JobInProgress> wjobs = new ArrayList<JobInProgress>();
+ private List<JobInProgress> jobs = new ArrayList<JobInProgress>();
+
+ public boolean contains (JobID id) {
+ return contains(id, true) || contains(id, false);
+ }
+
+ public boolean contains (JobID id, boolean waiting) {
+ List<JobInProgress> queue = waiting ? wjobs : jobs;
+ for (JobInProgress job : queue) {
+ if (job.getJobID().equals(id)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void jobAdded(JobInProgress job) {
+ LOG.info("Job " + job.getJobID().toString() + " added");
+ wjobs.add(job);
+ }
+
+ public void jobRemoved(JobInProgress job) {
+ LOG.info("Job " + job.getJobID().toString() + " removed");
+ }
+
+ public void jobUpdated(JobChangeEvent event) {
+ LOG.info("Job " + event.getJobInProgress().getJobID().toString() + " updated");
+ // remove the job is the event is for a completed job
+ if (event instanceof JobStatusChangeEvent) {
+ JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
+ if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
+ // check if the state changes from
+ // RUNNING->COMPLETE(SUCCESS/KILLED/FAILED)
+ JobInProgress jip = event.getJobInProgress();
+ String jobId = jip.getJobID().toString();
+ if (statusEvent.getJobInProgress().isComplete()) {
+ LOG.info("Job " + jobId + " deleted from the running queue");
+ jobs.remove(jip);
+ } else {
+ // PREP->RUNNING
+ LOG.info("Job " + jobId + " deleted from the waiting queue");
+ wjobs.remove(jip);
+ jobs.add(jip);
+ }
+ }
+ }
+ }
+ }
+
+ public void testJobFailure() throws Exception {
+ LOG.info("Testing job-success");
+
+ MyListener myListener = new MyListener();
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
+
+ JobConf job = mr.createJobConf();
+
+ mr.getJobTrackerRunner().getJobTracker()
+ .addJobInProgressListener(myListener);
+
+ // submit and kill the job
+ JobID id = TestJobKillAndFail.runJobFail(job);
+
+ // check if the job failure was notified
+ assertFalse("Missing event notification on failing a running job",
+ myListener.contains(id));
+
+ }
+
+ public void testJobKill() throws Exception {
+ LOG.info("Testing job-kill");
+
+ MyListener myListener = new MyListener();
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
+
+ JobConf job = mr.createJobConf();
+
+ mr.getJobTrackerRunner().getJobTracker()
+ .addJobInProgressListener(myListener);
+
+ // submit and kill the job
+ JobID id = TestJobKillAndFail.runJobKill(job);
+
+ // check if the job failure was notified
+ assertFalse("Missing event notification on killing a running job",
+ myListener.contains(id));
+
+ }
+
+ public void testJobSuccess() throws Exception {
+ LOG.info("Testing job-success");
+ MyListener myListener = new MyListener();
+
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
+
+ JobConf job = mr.createJobConf();
+
+ mr.getJobTrackerRunner().getJobTracker()
+ .addJobInProgressListener(myListener);
+
+ // submit the job
+ RunningJob rJob = TestJobKillAndFail.runJob(job);
+
+ // wait for the job to be running
+ while (rJob.getJobState() != JobStatus.RUNNING) {
+ TestJobTrackerRestart.waitFor(10);
+ }
+
+ LOG.info("Job " + rJob.getID().toString() + " started running");
+
+ // check if the listener was updated about this change
+ assertFalse("Missing event notification for a running job",
+ myListener.contains(rJob.getID(), true));
+
+ while (rJob.getJobState() != JobStatus.SUCCEEDED) {
+ TestJobTrackerRestart.waitFor(10);
+ }
+
+ // check if the job success was notified
+ assertFalse("Missing event notification for a successful job",
+ myListener.contains(rJob.getID(), false));
+ }
+}
\ No newline at end of file
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java Wed Oct 15 15:12:19 2008
@@ -40,7 +40,7 @@
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
- private void runJobFail(JobConf conf) throws IOException {
+ static JobID runJobFail(JobConf conf) throws IOException {
conf.setJobName("testjobfail");
conf.setMapperClass(FailMapper.class);
@@ -55,9 +55,11 @@
}
// Checking that the Job got failed
assertEquals(job.getJobState(), JobStatus.FAILED);
+
+ return job.getID();
}
- private void runJobKill(JobConf conf) throws IOException {
+ static JobID runJobKill(JobConf conf) throws IOException {
conf.setJobName("testjobkill");
conf.setMapperClass(KillMapper.class);
@@ -81,9 +83,11 @@
// Checking that the Job got killed
assertTrue(job.isComplete());
assertEquals(job.getJobState(), JobStatus.KILLED);
+
+ return job.getID();
}
- private RunningJob runJob(JobConf conf) throws IOException {
+ static RunningJob runJob(JobConf conf) throws IOException {
final Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
final Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=705073&r1=705072&r2=705073&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java Wed Oct 15 15:12:19 2008
@@ -29,6 +29,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -42,6 +44,21 @@
private MiniDFSCluster dfsCluster;
private JobConf jc;
private static final String JOB_SCHEDULING_INFO = "TESTSCHEDULINGINFO";
+ private static final Path TEST_DIR = new Path("job-queue-info-testing");
+
+ // configure a waiting job with 2 maps
+ private JobConf configureWaitingJob(JobConf conf) throws IOException {
+ Path inDir = new Path(TEST_DIR, "input");
+ Path shareDir = new Path(TEST_DIR, "share");
+ Path outputDir = new Path(TEST_DIR, "output");
+ String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
+ String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
+ JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL};
+ return TestJobTrackerRestart.getJobs(conf, priority,
+ new int[] {2}, new int[] {0},
+ outputDir, inDir,
+ mapSignalFile, redSignalFile)[0];
+ }
public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler {
@@ -89,10 +106,21 @@
assertEquals(1, queueInfos.length);
assertEquals("default", queueInfos[0].getQueueName());
JobConf conf = mrCluster.createJobConf();
- SleepJob sleepJob = new SleepJob();
- sleepJob.setConf(conf);
- conf = sleepJob.setupJobConf(4, 2, 1, 1, 1, 1);
- JobClient.runJob(conf);
+ FileSystem fileSys = dfsCluster.getFileSystem();
+
+ // configure a waiting job
+ conf = configureWaitingJob(conf);
+ conf.setJobName("test-job-queue-info-test");
+
+ // clear the signal file if any
+ TestJobTrackerRestart.cleanUp(fileSys, TEST_DIR);
+
+ RunningJob rJob = jc.submitJob(conf);
+
+ while (rJob.getJobState() != JobStatus.RUNNING) {
+ TestJobTrackerRestart.waitFor(10);
+ }
+
int numberOfJobs = 0;
for (JobQueueInfo queueInfo : queueInfos) {