You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/10/16 00:15:43 UTC

svn commit: r705075 - in /hadoop/core/branches/branch-0.19: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapr...

Author: omalley
Date: Wed Oct 15 15:15:41 2008
New Revision: 705075

URL: http://svn.apache.org/viewvc?rev=705075&view=rev
Log:
HADOOP-4053. Schedulers must be notified when jobs complete. (Amar Kamat via omalley)

Added:
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java
      - copied unchanged from r705073, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobChangeEvent.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java
      - copied unchanged from r705073, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobStatusChangeEvent.java
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
      - copied unchanged from r705073, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Wed Oct 15 15:15:41 2008
@@ -873,6 +873,8 @@
     HADOOP-4373. Fix calculation of Guaranteed Capacity for the
     capacity-scheduler. (Hemanth Yamijala via acmurthy) 
 
+    HADOOP-4053. Schedulers must be notified when jobs complete. (Amar Kamat via omalley)
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Oct 15 15:15:41 2008
@@ -33,6 +33,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
 import org.apache.hadoop.util.StringUtils;
 
@@ -680,7 +681,6 @@
         // this job is a candidate for running. Initialize it, move it
         // to run queue
         j.initTasks();
-        scheduler.jobQueuesManager.jobUpdated(j);
         // We found a suitable job. Get task from it.
         t = obtainNewTask(taskTracker, j);
         if (t != null) {
@@ -714,7 +714,6 @@
           scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
           if (usersOverLimit.contains(j.getProfile().getUser())) {
             j.initTasks();
-            scheduler.jobQueuesManager.jobUpdated(j);
             t = obtainNewTask(taskTracker, j);
             if (t != null) {
               LOG.debug("Getting task from job " + 
@@ -1144,8 +1143,8 @@
     reduceScheduler.jobAdded(job);
   }
 
-  // called when a job is removed
-  synchronized void jobRemoved(JobInProgress job) {
+  // called when a job completes
+  synchronized void jobCompleted(JobInProgress job) {
     // let our map and reduce schedulers know this, so they can update 
     // user-specific info
     mapScheduler.jobRemoved(job);

Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Wed Oct 15 15:15:41 2008
@@ -27,6 +27,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
 /**
  * A {@link JobInProgressListener} that maintains the jobs being managed in
@@ -173,25 +174,70 @@
     scheduler.jobAdded(job);
   }
 
-  @Override
-  public void jobRemoved(JobInProgress job) {
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
-    if (null == qi) {
-      // can't find queue for job. Shouldn't happen. 
-      LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
-          " when removing job " + job.getProfile().getJobID());
-      return;
-    }
+  private void jobCompleted(JobInProgress job, QueueInfo qi) {
+    LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
+             + job.getProfile().getQueueName() + " has completed");
     // job could be in running or waiting queue
     if (!qi.runningJobs.remove(job)) {
       QueueInfo.removeOb(qi.waitingJobs, job);
     }
     // let scheduler know
-    scheduler.jobRemoved(job);
+    scheduler.jobCompleted(job);
+  }
+  
+  // Note that job is removed when the job completes i.e in jobUpated()
+  @Override
+  public void jobRemoved(JobInProgress job) {}
+  
+  // This is used to reposition a job in the queue. A job can get repositioned 
+  // because of the change in the job priority or job start-time.
+  private void reorderJobs(JobInProgress job, QueueInfo qi) {
+    Collection<JobInProgress> queue = qi.waitingJobs;
+    
+    // Remove from the waiting queue
+    if (!QueueInfo.removeOb(queue, job)) {
+      queue = qi.runningJobs;
+      QueueInfo.removeOb(queue, job);
+    }
+    
+    // Add back to the queue
+    queue.add(job);
+  }
+  
+  // This is used to move a job from the waiting queue to the running queue.
+  private void makeJobRunning(JobInProgress job, QueueInfo qi) {
+    // Remove from the waiting queue
+    QueueInfo.removeOb(qi.waitingJobs, job);
+    
+    // Add the job to the running queue
+    qi.runningJobs.add(job);
+  }
+  
+  // Update the scheduler as job's state has changed
+  private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
+    JobInProgress job = event.getJobInProgress();
+    // Check if the ordering of the job has changed
+    // For now priority and start-time can change the job ordering
+    if (event.getEventType() == EventType.PRIORITY_CHANGED 
+        || event.getEventType() == EventType.START_TIME_CHANGED) {
+      // Make a priority change
+      reorderJobs(job, qi);
+    } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
+      // Check if the job is complete
+      int runState = job.getStatus().getRunState();
+      if (runState == JobStatus.SUCCEEDED
+          || runState == JobStatus.FAILED
+          || runState == JobStatus.KILLED) {
+        jobCompleted(job, qi);
+      } else if (runState == JobStatus.RUNNING) {
+        makeJobRunning(job, qi);
+      }
+    }
   }
   
   @Override
-  public void jobUpdated(JobInProgress job) {
+  public void jobUpdated(JobChangeEvent event) {
+    JobInProgress job = event.getJobInProgress();
     QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
     if (null == qi) {
       // can't find queue for job. Shouldn't happen. 
@@ -199,18 +245,11 @@
           " when updating job " + job.getProfile().getJobID());
       return;
     }
-    // this is called when a job's priority or state is changed.
-    // since we don't know the job's previous state, we need to 
-    // find out in which queue it was earlier, and then place it in the
-    // right queue.
-    Collection<JobInProgress> dest = (job.getStatus().getRunState() == 
-      JobStatus.PREP)? qi.waitingJobs: qi.runningJobs;
-    // We use our own version of removing objects based on referential
-    // equality, since the 'job' object has already been changed. 
-    if (!QueueInfo.removeOb(qi.waitingJobs, job)) {
-      qi.runningJobs.remove(job);
+    
+    // Check if this is the status change
+    if (event instanceof JobStatusChangeEvent) {
+      jobStateChanged((JobStatusChangeEvent)event, qi);
     }
-    dest.add(job);
   }
   
 }

Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Oct 15 15:15:41 2008
@@ -33,6 +33,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 //import org.apache.hadoop.mapred.CapacityTaskScheduler;
 import org.apache.hadoop.conf.Configuration;
 
@@ -289,6 +290,32 @@
       status.setRunState(TaskStatus.State.SUCCEEDED);
     }
     
+    void finalizeJob(FakeJobInProgress fjob) {
+      // take a snapshot of the status before changing it
+      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
+      fjob.getStatus().setRunState(JobStatus.SUCCEEDED);
+      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
+      JobStatusChangeEvent event = 
+        new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus, 
+                                  newStatus);
+      for (JobInProgressListener listener : listeners) {
+        listener.jobUpdated(event);
+      }
+    }
+    
+    public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
+      // take a snapshot of the status before changing it
+      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
+      fjob.setPriority(priority);
+      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
+      JobStatusChangeEvent event = 
+        new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus, 
+                                  newStatus);
+      for (JobInProgressListener listener : listeners) {
+        listener.jobUpdated(event);
+      }
+    }
+    
     void addQueues(String[] arr) {
       Set<String> queues = new HashSet<String>();
       for (String s: arr) {
@@ -415,6 +442,84 @@
     return job;
   }
   
+  // Submit a job and update the listeners
+  private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
+                                             String queue, String user) 
+  throws IOException {
+    FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
+    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    return j;
+  }
+  
+  // Note that there is no concept of setup tasks here. So init itself should 
+  // report the job-status change
+  private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip) 
+  throws IOException {
+    JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+    jip.initTasks();
+    JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+    return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED, 
+                                    oldStatus, newStatus);
+  }
+  
+  // test job run-state change
+  public void testJobRunStateChange() throws IOException {
+    // start the scheduler
+    taskTrackerManager.addQueues(new String[] {"default"});
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 1f, 1, true, 1));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    
+    // submit the job
+    FakeJobInProgress fjob1 = 
+      submitJob(JobStatus.PREP, 1, 0, "default", "user");
+    
+    FakeJobInProgress fjob2 = 
+      submitJob(JobStatus.PREP, 1, 0, "default", "user");
+    
+    // check if the job is in the waiting queue
+    assertTrue("Waiting queue doesnt contain queued job", 
+                scheduler.jobQueuesManager.getWaitingJobQueue("default")
+                         .contains(fjob1));
+    
+    // change the job priority
+    taskTrackerManager.setPriority(fjob2, JobPriority.HIGH);
+    
+    // Check if the priority changes are reflected
+    JobInProgress firstJob = 
+      scheduler.getJobs("default").toArray(new JobInProgress[0])[0];
+    assertTrue("Priority change didnt not work as expected", 
+               firstJob.getJobID().equals(fjob2.getJobID()));
+    
+    // Create an event
+    JobChangeEvent event = initTasksAndReportEvent(fjob1);
+    
+    // inform the scheduler
+    scheduler.jobQueuesManager.jobUpdated(event);
+    
+    // check if the job is in the running queue
+    assertTrue("Running queue doesnt contain running/inited job", 
+                scheduler.jobQueuesManager.getRunningJobQueue("default")
+                         .contains(fjob1));
+    
+    // schedule a task
+    List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+    
+    // complete the job
+    taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(), 
+                                  fjob1);
+    
+    // mark the job as complete
+    taskTrackerManager.finalizeJob(fjob1);
+    
+    // check if the job is removed from the scheduler
+    assertFalse("Scheduler contains completed job", 
+                scheduler.getJobs("default").contains(fjob1));
+  }
+  
   /*protected void submitJobs(int number, int state, int maps, int reduces)
     throws IOException {
     for (int i = 0; i < number; i++) {
@@ -435,14 +540,8 @@
     scheduler.start();
 
     // submit 2 jobs
-    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
-    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
-    
-    // init them and inform the scheduler
-    j1.initTasks();
-    scheduler.jobQueuesManager.jobUpdated(j1);
-    j2.initTasks();
-    scheduler.jobQueuesManager.jobUpdated(j2);
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
+    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
     
     // I. Check multiple assignments with running tasks within job
     // ask for a task from first job
@@ -474,8 +573,7 @@
     
     // IV. Check assignment with completed job
     // finish first job
-    j1.getStatus().setRunState(JobStatus.SUCCEEDED);
-    scheduler.jobRemoved(j1);
+    scheduler.jobCompleted(j1);
     
     // ask for another task from the second job
     // if tasks can be assigned then the structures are properly updated 
@@ -500,12 +598,13 @@
 
     // submit a job with no queue specified. It should be accepted
     // and given to the default queue. 
-    JobInProgress j = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    
     // when we ask for a task, we should get one, from the job submitted
     Task t;
     t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // submit another job, to a different queue
-    j = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // now when we get a task, it should be from the second job
     t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
   }
@@ -522,7 +621,7 @@
     scheduler.start();
     
     // submit a job
-    JobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
+    JobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     
     // submit another job
@@ -610,7 +709,7 @@
     scheduler.start();
 
     // submit a job  
-    submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the GC for maps is 2. Since we're the only user, 
     // we should get a task 
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
@@ -637,12 +736,12 @@
     scheduler.start();
 
     // submit a job  
-    submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the GC for maps is 2. Since we're the only user, 
     // we should get a task 
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // Submit another job, from a different user
-    submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // Now if I ask for a map task, it should come from the second job 
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
     // Now we're at full capacity for maps. If I ask for another map task,
@@ -666,14 +765,14 @@
     scheduler.start();
 
     // submit a job  
-    submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the GC for maps is 2. Since we're the only user, 
     // we should get a task 
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // since we're the only job, we get another map
     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
     // Submit another job, from a different user
-    submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // Now if I ask for a map task, it should come from the second job 
     checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     // and another
@@ -695,7 +794,7 @@
     scheduler.start();
 
     // submit a job  
-    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the GC for maps is 2. Since we're the only user, 
     // we should get a task 
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
@@ -705,7 +804,7 @@
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
     // Submit another job, from a different user
-    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // one of the task finishes
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
     // Now if I ask for a map task, it should come from the second job 
@@ -738,7 +837,7 @@
     taskTrackerManager.addTaskTracker("tt5");
 
     // u1 submits job
-    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     // it gets the first 5 slots
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
@@ -746,7 +845,7 @@
     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
     checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
     // u2 submits job with 4 slots
-    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 4, 4, null, "u2");
+    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
     // u2 should get next 4 slots
     checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
     checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
@@ -793,8 +892,8 @@
 
     // set up a situation where q2 is under capacity, and default & q3
     // are at/over capacity
-    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
-    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
@@ -841,11 +940,11 @@
     taskTrackerManager.addTaskTracker("tt5");
 
     // q2 has nothing running, default is under cap, q3 and q4 are over cap
-    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 2, 2, null, "u1");
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 2, 2, null, "u1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q4", "u1");
+    FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q4", "u1");
     checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
     checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
@@ -893,13 +992,13 @@
 
     // set up a situation where q2 is under capacity, and default is
     // at/over capacity
-    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
     // now submit a job to q2
-    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 1, 1, "q2", "u1");
+    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
     // update our structures
     scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
@@ -908,7 +1007,7 @@
     // we start reclaiming when 15 secs are left. 
     clock.advance(400000);
     // submit another job to q2 which causes more capacity to be reclaimed
-    j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+    j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // update our structures
     scheduler.updateQSIInfo();
     clock.advance(200000);

Modified: hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Oct 15 15:15:41 2008
@@ -190,7 +190,7 @@
     }
   
     @Override
-    public void jobUpdated(JobInProgress job) {
+    public void jobUpdated(JobChangeEvent event) {
     }
   }
 

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Wed Oct 15 15:15:41 2008
@@ -25,6 +25,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -132,9 +133,20 @@
   }
 
   @Override
-  public void jobUpdated(JobInProgress job) {
-    synchronized (jobInitQueue) {
-      resortInitQueue();
+  public void jobUpdated(JobChangeEvent event) {
+    if (event instanceof JobStatusChangeEvent) {
+      jobStateChanged((JobStatusChangeEvent)event);
+    }
+  }
+  
+  // called when the job's status is changed
+  private void jobStateChanged(JobStatusChangeEvent event) {
+    // Resort the job queue if the job-start-time or job-priority changes
+    if (event.getEventType() == EventType.START_TIME_CHANGED
+        || event.getEventType() == EventType.PRIORITY_CHANGED) {
+      synchronized (jobInitQueue) {
+        resortInitQueue();
+      }
     }
   }
 

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 15 15:15:41 2008
@@ -2385,4 +2385,11 @@
     boolean initStarted;
     boolean initDone;
   }
+
+  boolean isComplete() {
+    int runState = this.status.getRunState();
+    return runState == JobStatus.SUCCEEDED 
+           || runState == JobStatus.FAILED 
+           || runState == JobStatus.KILLED;
+  }
 }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java Wed Oct 15 15:15:41 2008
@@ -37,7 +37,8 @@
   
   /**
    * Invoked when a job has been updated in the {@link JobTracker}.
-   * @param job The updated job.
+   * This change in the job is tracker using {@link JobChangeEvent}.
+   * @param event the event that tracks the change
    */
-  public abstract void jobUpdated(JobInProgress job);
+  public abstract void jobUpdated(JobChangeEvent event);
 }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java Wed Oct 15 15:15:41 2008
@@ -22,6 +22,8 @@
 import java.util.Comparator;
 import java.util.TreeSet;
 
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+
 /**
  * A {@link JobInProgressListener} that maintains the jobs being managed in
  * a queue. By default the queue is FIFO, but it is possible to use custom
@@ -74,13 +76,38 @@
     jobQueue.add(job);
   }
 
+  // Job will be removed once the job completes
   @Override
-  public void jobRemoved(JobInProgress job) {
+  public void jobRemoved(JobInProgress job) {}
+  
+  private void jobCompleted(JobInProgress job) {
     jobQueue.remove(job);
   }
   
   @Override
-  public synchronized void jobUpdated(JobInProgress job) {
+  public synchronized void jobUpdated(JobChangeEvent event) {
+    JobInProgress job = event.getJobInProgress();
+    if (event instanceof JobStatusChangeEvent) {
+      // Check if the ordering of the job has changed
+      // For now priority and start-time can change the job ordering
+      JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
+      if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED 
+          || statusEvent.getEventType() == EventType.START_TIME_CHANGED) {
+        // Make a priority change
+        reorderJobs(job);
+      } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
+        // Check if the job is complete
+        int runState = statusEvent.getNewStatus().getRunState();
+        if (runState == JobStatus.SUCCEEDED
+            || runState == JobStatus.FAILED
+            || runState == JobStatus.KILLED) {
+          jobCompleted(job);
+        }
+      }
+    }
+  }
+  
+  private void reorderJobs(JobInProgress job) {
     synchronized (jobQueue) {
       jobQueue.remove(job);
       jobQueue.add(job);

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java Wed Oct 15 15:15:41 2008
@@ -32,7 +32,7 @@
  * not intended to be a comprehensive piece of data.
  * For that, look at JobProfile.
  **************************************************/
-public class JobStatus implements Writable {
+public class JobStatus implements Writable, Cloneable {
 
   static {                                      // register a ctor
     WritableFactories.setFactory
@@ -216,6 +216,16 @@
    */
   synchronized public long getStartTime() { return startTime;}
 
+  @Override
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      // Shouldn't happen since we do implement Clonable
+      throw new InternalError(cnse.toString());
+    }
+  }
+  
   /**
    * @param user The username of the job
    */

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Oct 15 15:15:41 2008
@@ -57,6 +57,7 @@
 import org.apache.hadoop.mapred.JobHistory.Keys;
 import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -534,25 +535,16 @@
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           jip.initTasks();
-          updateJobListeners();
-        }
-      }
-      
-      private void updateJobListeners() {
-        // The scheduler needs to be informed as the recovery-manager
-        // has inited the jobs
-        for (JobInProgressListener listener : jobInProgressListeners) {
-          listener.jobUpdated(jip);
         }
       }
       
       void close() {
         if (hasUpdates) {
           // Apply the final (job-level) updates
-          updateJob(jip, job);
-          // Update the job listeners as the start/submit time and the job 
-          // priority has changed
-          updateJobListeners();
+          JobStatusChangeEvent event = updateJob(jip, job);
+          
+          // Update the job listeners
+          updateJobInProgressListeners(event);
         }
       }
       
@@ -612,20 +604,29 @@
       }
     }
     
-    private void updateJob(JobInProgress jip, JobHistory.JobInfo job) {
+    private JobStatusChangeEvent updateJob(JobInProgress jip, 
+                                           JobHistory.JobInfo job) {
+      // Change the job priority
+      String jobpriority = job.get(Keys.JOB_PRIORITY);
+      JobPriority priority = JobPriority.valueOf(jobpriority);
+      // It's important to update this via the jobtracker's api as it will 
+      // take care of updating the event listeners too
+      setJobPriority(jip.getJobID(), priority);
+
+      // Save the previous job status
+      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+      
       // Set the start/launch time only if there are recovered tasks
       // Increment the job's restart count
       jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME), 
                         job.getLong(JobHistory.Keys.LAUNCH_TIME),
                         job.getInt(Keys.RESTART_COUNT) + 1);
+
+      // Save the new job status
+      JobStatus newStatus = (JobStatus)jip.getStatus().clone();
       
-      // Change the job priority
-      String jobpriority = job.get(Keys.JOB_PRIORITY);
-      if (jobpriority.length() > 0) {
-        JobPriority priority = JobPriority.valueOf(jobpriority);
-        // Its important to update this via the jobtracker's api
-        setJobPriority(jip.getJobID(), priority);
-      }
+      return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus, 
+                                      newStatus);
     }
     
     private void updateTip(TaskInProgress tip, JobHistory.Task task) {
@@ -1761,6 +1762,13 @@
     jobInProgressListeners.remove(listener);
   }
   
+  // Update the listeners about the job
+  private void updateJobInProgressListeners(JobChangeEvent event) {
+    for (JobInProgressListener listener : jobInProgressListeners) {
+      listener.jobUpdated(event);
+    }
+  }
+  
   /**
    * Return the {@link QueueManager} associated with the JobTracker.
    */
@@ -2263,8 +2271,24 @@
     
   public synchronized void killJob(JobID jobid) throws IOException {
     JobInProgress job = jobs.get(jobid);
+    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
     job.kill();
+    
+    // Inform the listeners if the job is killed
+    // Note : 
+    //   If the job is killed in the PREP state then the listeners will be 
+    //   invoked
+    //   If the job is killed in the RUNNING state then cleanup tasks will be 
+    //   launched and the updateTaskStatuses() will take care of it
+    JobStatus newStatus = (JobStatus)job.getStatus().clone();
+    if (prevStatus.getRunState() != newStatus.getRunState()
+        && newStatus.getRunState() == JobStatus.KILLED) {
+      JobStatusChangeEvent event = 
+        new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+            newStatus);
+      updateJobInProgressListeners(event);
+    }
   }
 
   /**
@@ -2530,10 +2554,13 @@
     JobInProgress job = jobs.get(jobId);
     if (job != null) {
       synchronized (taskScheduler) {
+        JobStatus oldStatus = (JobStatus)job.getStatus().clone();
         job.setPriority(priority);
-        for (JobInProgressListener listener : jobInProgressListeners) {
-          listener.jobUpdated(job);
-        }
+        JobStatus newStatus = (JobStatus)job.getStatus().clone();
+        JobStatusChangeEvent event = 
+          new JobStatusChangeEvent(job, EventType.PRIORITY_CHANGED, oldStatus, 
+                                   newStatus);
+        updateJobInProgressListeners(event);
       }
     } else {
       LOG.warn("Trying to change the priority of an unknown job: " + jobId);
@@ -2559,13 +2586,25 @@
       // Check if the tip is known to the jobtracker. In case of a restarted
       // jt, some tasks might join in later
       if (tip != null || hasRestarted()) {
+        JobInProgress job = getJob(taskId.getJobID());
         if (tip == null) {
-          JobInProgress job = getJob(taskId.getJobID());
           tip = job.getTaskInProgress(taskId.getTaskID());
           job.addRunningTaskToTIP(tip, taskId, status, false);
         }
         expireLaunchingTasks.removeTask(taskId);
-        tip.getJob().updateTaskStatus(tip, report, myInstrumentation);
+        
+        // Update the job and inform the listeners if necessary
+        JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+        job.updateTaskStatus(tip, report, myInstrumentation);
+        JobStatus newStatus = (JobStatus)job.getStatus().clone();
+        
+        // Update the listeners if an incomplete job completes
+        if (prevStatus.getRunState() != newStatus.getRunState()) {
+          JobStatusChangeEvent event = 
+            new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, 
+                                     prevStatus, newStatus);
+          updateJobInProgressListeners(event);
+        }
       } else {
         LOG.info("Serious problem.  While updating status, cannot find taskid " 
                  + report.getTaskID());

Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java Wed Oct 15 15:15:41 2008
@@ -40,7 +40,7 @@
   private static String TEST_ROOT_DIR = new File(System.getProperty(
       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
 
-  private void runJobFail(JobConf conf) throws IOException {
+  static JobID runJobFail(JobConf conf) throws IOException {
 
     conf.setJobName("testjobfail");
     conf.setMapperClass(FailMapper.class);
@@ -55,9 +55,11 @@
     }
     // Checking that the Job got failed
     assertEquals(job.getJobState(), JobStatus.FAILED);
+    
+    return job.getID();
   }
 
-  private void runJobKill(JobConf conf) throws IOException {
+  static JobID runJobKill(JobConf conf) throws IOException {
 
     conf.setJobName("testjobkill");
     conf.setMapperClass(KillMapper.class);
@@ -81,9 +83,11 @@
     // Checking that the Job got killed
     assertTrue(job.isComplete());
     assertEquals(job.getJobState(), JobStatus.KILLED);
+    
+    return job.getID();
   }
 
-  private RunningJob runJob(JobConf conf) throws IOException {
+  static RunningJob runJob(JobConf conf) throws IOException {
 
     final Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
     final Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");

Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=705075&r1=705074&r2=705075&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java Wed Oct 15 15:15:41 2008
@@ -29,6 +29,8 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -42,6 +44,21 @@
   private MiniDFSCluster dfsCluster;
   private JobConf jc;
   private static final String JOB_SCHEDULING_INFO = "TESTSCHEDULINGINFO";
+  private static final Path TEST_DIR = new Path("job-queue-info-testing");
+  
+  // configure a waiting job with 2 maps
+  private JobConf configureWaitingJob(JobConf conf) throws IOException {
+    Path inDir = new Path(TEST_DIR, "input");
+    Path shareDir = new Path(TEST_DIR, "share");
+    Path outputDir = new Path(TEST_DIR, "output");
+    String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
+    String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
+    JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL};
+    return TestJobTrackerRestart.getJobs(conf, priority, 
+                                         new int[] {2}, new int[] {0}, 
+                                         outputDir, inDir, 
+                                         mapSignalFile, redSignalFile)[0];
+  }
 
   public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler {
 
@@ -89,10 +106,21 @@
     assertEquals(1, queueInfos.length);
     assertEquals("default", queueInfos[0].getQueueName());
     JobConf conf = mrCluster.createJobConf();
-    SleepJob sleepJob = new SleepJob();
-    sleepJob.setConf(conf);
-    conf = sleepJob.setupJobConf(4, 2, 1, 1, 1, 1);
-    JobClient.runJob(conf);
+    FileSystem fileSys = dfsCluster.getFileSystem();
+    
+    // configure a waiting job
+    conf = configureWaitingJob(conf);
+    conf.setJobName("test-job-queue-info-test");
+    
+    // clear the signal file if any
+    TestJobTrackerRestart.cleanUp(fileSys, TEST_DIR);
+    
+    RunningJob rJob = jc.submitJob(conf);
+    
+    while (rJob.getJobState() != JobStatus.RUNNING) {
+      TestJobTrackerRestart.waitFor(10);
+    }
+    
     int numberOfJobs = 0;
 
     for (JobQueueInfo queueInfo : queueInfos) {