You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/10/21 01:51:12 UTC

svn commit: r706461 - in /hadoop/core/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: omalley
Date: Mon Oct 20 16:51:12 2008
New Revision: 706461

URL: http://svn.apache.org/viewvc?rev=706461&view=rev
Log:
HADOOP-4149. Fix handling of updates to the job priority, by changing the
list of jobs to be keyed by the priority, submit time, and job tracker id.
(Amar Kamat via omalley)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=706461&r1=706460&r2=706461&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Oct 20 16:51:12 2008
@@ -989,6 +989,10 @@
     HADOOP-4404. saveFSImage() removes files from a storage directory that do 
     not correspond to its type. (shv)
 
+    HADOOP-4149. Fix handling of updates to the job priority, by changing the
+    list of jobs to be keyed by the priority, submit time, and job tracker id.
+    (Amar Kamat via omalley)
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=706461&r1=706460&r2=706461&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Mon Oct 20 16:51:12 2008
@@ -20,13 +20,13 @@
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.TreeSet;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
 /**
@@ -45,35 +45,17 @@
    * is ahead in the queue, so insertion should be at the tail.
    */
   
-  // comparator for jobs in queues that support priorities
-  private static final Comparator<JobInProgress> PRIORITY_JOB_COMPARATOR
-    = new Comparator<JobInProgress>() {
-    public int compare(JobInProgress o1, JobInProgress o2) {
-      // Look at priority.
-      int res = o1.getPriority().compareTo(o2.getPriority());
-      if (res == 0) {
-        // the job that started earlier wins
-        if (o1.getStartTime() < o2.getStartTime()) {
-          res = -1;
-        } else {
-          res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
-        }
-      }
-      if (res == 0) {
-        res = o1.getJobID().compareTo(o2.getJobID());
-      }
-      return res;
-    }
-  };
   // comparator for jobs in queues that don't support priorities
-  private static final Comparator<JobInProgress> STARTTIME_JOB_COMPARATOR
-    = new Comparator<JobInProgress>() {
-    public int compare(JobInProgress o1, JobInProgress o2) {
+  private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
+    = new Comparator<JobSchedulingInfo>() {
+    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
       // the job that started earlier wins
       if (o1.getStartTime() < o2.getStartTime()) {
         return -1;
       } else {
-        return (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
+        return (o1.getStartTime() == o2.getStartTime() 
+                ? o1.getJobID().compareTo(o2.getJobID()) 
+                : 1);
       }
     }
   };
@@ -83,40 +65,27 @@
 
     // whether the queue supports priorities
     boolean supportsPriorities;
-    // maintain separate collections of running & waiting jobs. This we do 
+    // maintain separate structures for running & waiting jobs. This we do 
     // mainly because when a new job is added, it cannot superceede a running 
     // job, even though the latter may be a lower priority. If this is ever
     // changed, we may get by with one collection. 
-    Collection<JobInProgress> waitingJobs;
+    Map<JobSchedulingInfo, JobInProgress> waitingJobs;
     Collection<JobInProgress> runningJobs;
     
     QueueInfo(boolean prio) {
       this.supportsPriorities = prio;
       if (supportsPriorities) {
-        this.waitingJobs = new TreeSet<JobInProgress>(PRIORITY_JOB_COMPARATOR);
+        // use the default priority-aware comparator
+        this.waitingJobs = 
+          new TreeMap<JobSchedulingInfo, JobInProgress>(
+              JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
       }
       else {
-        this.waitingJobs = new TreeSet<JobInProgress>(STARTTIME_JOB_COMPARATOR);
+        this.waitingJobs = 
+          new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
       }
       this.runningJobs = new LinkedList<JobInProgress>();
     }
-
-    /**
-     * we need to delete an object from our TreeSet based on referential
-     * equality, rather than value equality that the TreeSet uses. 
-     * Another way to do this is to extend the TreeSet and override remove().
-     */
-    static private boolean removeOb(Collection<JobInProgress> c, Object o) {
-      Iterator<JobInProgress> i = c.iterator();
-      while (i.hasNext()) {
-          if (i.next() == o) {
-              i.remove();
-              return true;
-          }
-      }
-      return false;
-    }
-    
   }
   
   // we maintain a hashmap of queue-names to queue info
@@ -150,7 +119,7 @@
    * Returns the queue of waiting jobs associated with the name
    */
   public Collection<JobInProgress> getWaitingJobQueue(String queueName) {
-    return jobQueues.get(queueName).waitingJobs;
+    return jobQueues.get(queueName).waitingJobs.values();
   }
   
   @Override
@@ -167,19 +136,18 @@
     }
     // add job to waiting queue. It will end up in the right place, 
     // based on priority. 
-    // We use our own version of removing objects based on referential
-    // equality, since the 'job' object has already been changed. 
-    qi.waitingJobs.add(job);
+    qi.waitingJobs.put(new JobSchedulingInfo(job), job);
     // let scheduler know. 
     scheduler.jobAdded(job);
   }
 
-  private void jobCompleted(JobInProgress job, QueueInfo qi) {
+  private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, 
+                            QueueInfo qi) {
     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
              + job.getProfile().getQueueName() + " has completed");
     // job could be in running or waiting queue
     if (!qi.runningJobs.remove(job)) {
-      QueueInfo.removeOb(qi.waitingJobs, job);
+      qi.waitingJobs.remove(oldInfo);
     }
     // let scheduler know
     scheduler.jobCompleted(job);
@@ -191,23 +159,24 @@
   
   // This is used to reposition a job in the queue. A job can get repositioned 
   // because of the change in the job priority or job start-time.
-  private void reorderJobs(JobInProgress job, QueueInfo qi) {
-    Collection<JobInProgress> queue = qi.waitingJobs;
+  private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
+                           QueueInfo qi) {
     
-    // Remove from the waiting queue
-    if (!QueueInfo.removeOb(queue, job)) {
-      queue = qi.runningJobs;
-      QueueInfo.removeOb(queue, job);
+    if (qi.waitingJobs.remove(oldInfo) == null) {
+      qi.runningJobs.remove(job);
+      // Add back to the running queue
+      qi.runningJobs.add(job);
+    } else {
+      // Add back to the waiting queue
+      qi.waitingJobs.put(new JobSchedulingInfo(job), job);
     }
-    
-    // Add back to the queue
-    queue.add(job);
   }
   
   // This is used to move a job from the waiting queue to the running queue.
-  private void makeJobRunning(JobInProgress job, QueueInfo qi) {
+  private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, 
+                              QueueInfo qi) {
     // Remove from the waiting queue
-    QueueInfo.removeOb(qi.waitingJobs, job);
+    qi.waitingJobs.remove(oldInfo);
     
     // Add the job to the running queue
     qi.runningJobs.add(job);
@@ -216,21 +185,23 @@
   // Update the scheduler as job's state has changed
   private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
     JobInProgress job = event.getJobInProgress();
+    JobSchedulingInfo oldJobStateInfo = 
+      new JobSchedulingInfo(event.getOldStatus());
     // Check if the ordering of the job has changed
     // For now priority and start-time can change the job ordering
     if (event.getEventType() == EventType.PRIORITY_CHANGED 
         || event.getEventType() == EventType.START_TIME_CHANGED) {
       // Make a priority change
-      reorderJobs(job, qi);
+      reorderJobs(job, oldJobStateInfo, qi);
     } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
       // Check if the job is complete
       int runState = job.getStatus().getRunState();
       if (runState == JobStatus.SUCCEEDED
           || runState == JobStatus.FAILED
           || runState == JobStatus.KILLED) {
-        jobCompleted(job, qi);
+        jobCompleted(job, oldJobStateInfo, qi);
       } else if (runState == JobStatus.RUNNING) {
-        makeJobRunning(job, qi);
+        makeJobRunning(job, oldJobStateInfo, qi);
       }
     }
   }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=706461&r1=706460&r2=706461&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Mon Oct 20 16:51:12 2008
@@ -57,8 +57,9 @@
       super(jId, jobConf);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
-      this.status = new JobStatus();
-      this.status.setRunState(JobStatus.PREP);
+      this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP);
+      this.status.setJobPriority(JobPriority.NORMAL);
+      this.status.setStartTime(startTime);
       if (null == jobConf.getQueueName()) {
         this.profile = new JobProfile(user, jId, 
             null, null, null);
@@ -316,6 +317,23 @@
       }
     }
     
+    public void setStartTime(FakeJobInProgress fjob, long start) {
+      // take a snapshot of the status before changing it
+      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
+      
+      fjob.startTime = start; // change the start time of the job
+      fjob.status.setStartTime(start); // change the start time of the jobstatus
+      
+      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
+      
+      JobStatusChangeEvent event = 
+        new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus,
+                                  newStatus);
+      for (JobInProgressListener listener : listeners) {
+        listener.jobUpdated(event);
+      }
+    }
+    
     void addQueues(String[] arr) {
       Set<String> queues = new HashSet<String>();
       for (String s: arr) {
@@ -484,18 +502,44 @@
       submitJob(JobStatus.PREP, 1, 0, "default", "user");
     
     // check if the job is in the waiting queue
-    assertTrue("Waiting queue doesnt contain queued job", 
-                scheduler.jobQueuesManager.getWaitingJobQueue("default")
-                         .contains(fjob1));
-    
-    // change the job priority
-    taskTrackerManager.setPriority(fjob2, JobPriority.HIGH);
+    JobInProgress[] jobs = 
+      scheduler.jobQueuesManager.getWaitingJobQueue("default")
+               .toArray(new JobInProgress[0]);
+    assertTrue("Waiting queue doesnt contain queued job #1 in right order", 
+                jobs[0].getJobID().equals(fjob1.getJobID()));
+    assertTrue("Waiting queue doesnt contain queued job #2 in right order", 
+                jobs[1].getJobID().equals(fjob2.getJobID()));
+    
+    // I. Check the start-time change
+    // Change job2 start-time and check if job2 bumps up in the queue 
+    taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
+    
+    jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default")
+                    .toArray(new JobInProgress[0]);
+    assertTrue("Start time change didnt not work as expected for job #2", 
+                jobs[0].getJobID().equals(fjob2.getJobID()));
+    assertTrue("Start time change didnt not work as expected for job #1", 
+                jobs[1].getJobID().equals(fjob1.getJobID()));
+    
+    // check if the queue is fine
+    assertEquals("Start-time change garbled the waiting queue", 
+                 2, scheduler.getJobs("default").size());
+    
+    // II. Change job priority change
+    // Bump up job1's priority and make sure job1 bumps up in the queue
+    taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
     
     // Check if the priority changes are reflected
-    JobInProgress firstJob = 
-      scheduler.getJobs("default").toArray(new JobInProgress[0])[0];
-    assertTrue("Priority change didnt not work as expected", 
-               firstJob.getJobID().equals(fjob2.getJobID()));
+    jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default")
+                    .toArray(new JobInProgress[0]);
+    assertTrue("Priority change didnt not work as expected for job #1", 
+                jobs[0].getJobID().equals(fjob1.getJobID()));
+    assertTrue("Priority change didnt not work as expected for job #2", 
+                jobs[1].getJobID().equals(fjob2.getJobID()));
+    
+    // check if the queue is fine
+    assertEquals("Priority change has garbled the waiting queue", 
+                 2, scheduler.getJobs("default").size());
     
     // Create an event
     JobChangeEvent event = initTasksAndReportEvent(fjob1);
@@ -503,10 +547,26 @@
     // inform the scheduler
     scheduler.jobQueuesManager.jobUpdated(event);
     
+    // waiting queue
+    Collection<JobInProgress> wqueue = 
+      scheduler.jobQueuesManager.getWaitingJobQueue("default");
+    
+    // check if the job is not in the waiting queue
+    assertFalse("Waiting queue contains running/inited job", 
+                wqueue.contains(fjob1));
+    
+    // check if the waiting queue is fine
+    assertEquals("Waiting queue is garbled on job init", 1, wqueue.size());
+    
+    Collection<JobInProgress> rqueue = 
+      scheduler.jobQueuesManager.getRunningJobQueue("default");
+    
     // check if the job is in the running queue
     assertTrue("Running queue doesnt contain running/inited job", 
-                scheduler.jobQueuesManager.getRunningJobQueue("default")
-                         .contains(fjob1));
+                rqueue.contains(fjob1));
+    
+    // check if the running queue is fine
+    assertEquals("Running queue is garbled upon init", 1, rqueue.size());
     
     // schedule a task
     List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
@@ -518,9 +578,16 @@
     // mark the job as complete
     taskTrackerManager.finalizeJob(fjob1);
     
+    rqueue = scheduler.jobQueuesManager.getRunningJobQueue("default");
+    
     // check if the job is removed from the scheduler
     assertFalse("Scheduler contains completed job", 
-                scheduler.getJobs("default").contains(fjob1));
+                rqueue.contains(fjob1));
+    
+    // check if the running queue size is correct
+    assertEquals("Job finish garbles the queue", 
+                 0, rqueue.size());
+    
   }
   
   /*protected void submitJobs(int number, int state, int maps, int reduces)

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=706461&r1=706460&r2=706461&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Oct 20 16:51:12 2008
@@ -198,6 +198,7 @@
     this.jobtracker = jobtracker;
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
+    status.setStartTime(startTime);
     this.localFs = FileSystem.getLocal(default_conf);
 
     JobConf default_job_conf = new JobConf(default_conf);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java?rev=706461&r1=706460&r2=706461&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java Mon Oct 20 16:51:12 2008
@@ -20,7 +20,8 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.TreeSet;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
@@ -32,9 +33,32 @@
  */
 class JobQueueJobInProgressListener extends JobInProgressListener {
 
-  private static final Comparator<JobInProgress> FIFO_JOB_QUEUE_COMPARATOR
-    = new Comparator<JobInProgress>() {
-    public int compare(JobInProgress o1, JobInProgress o2) {
+  /** A class that groups all the information from a {@link JobInProgress} that 
+   * is necessary for scheduling a job.
+   */ 
+  static class JobSchedulingInfo {
+    private JobPriority priority;
+    private long startTime;
+    private JobID id;
+    
+    public JobSchedulingInfo(JobInProgress jip) {
+      this(jip.getStatus());
+    }
+    
+    public JobSchedulingInfo(JobStatus status) {
+      priority = status.getJobPriority();
+      startTime = status.getStartTime();
+      id = status.getJobID();
+    }
+    
+    JobPriority getPriority() {return priority;}
+    long getStartTime() {return startTime;}
+    JobID getJobID() {return id;}
+  }
+  
+  static final Comparator<JobSchedulingInfo> FIFO_JOB_QUEUE_COMPARATOR
+    = new Comparator<JobSchedulingInfo>() {
+    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
       int res = o1.getPriority().compareTo(o2.getPriority());
       if (res == 0) {
         if (o1.getStartTime() < o2.getStartTime()) {
@@ -50,38 +74,40 @@
     }
   };
   
-  private Collection<JobInProgress> jobQueue;
+  private Map<JobSchedulingInfo, JobInProgress> jobQueue;
   
   public JobQueueJobInProgressListener() {
-    this(new TreeSet<JobInProgress>(FIFO_JOB_QUEUE_COMPARATOR));
+    this(new TreeMap<JobSchedulingInfo, 
+                     JobInProgress>(FIFO_JOB_QUEUE_COMPARATOR));
   }
 
   /**
    * For clients that want to provide their own job priorities.
    * @param jobQueue A collection whose iterator returns jobs in priority order.
    */
-  protected JobQueueJobInProgressListener(Collection<JobInProgress> jobQueue) {
-    this.jobQueue = Collections.synchronizedCollection(jobQueue);
+  protected JobQueueJobInProgressListener(Map<JobSchedulingInfo, 
+                                          JobInProgress> jobQueue) {
+    this.jobQueue = Collections.synchronizedMap(jobQueue);
   }
 
   /**
    * Returns a synchronized view of the the job queue.
    */
   public Collection<JobInProgress> getJobQueue() {
-    return jobQueue;
+    return jobQueue.values();
   }
   
   @Override
   public void jobAdded(JobInProgress job) {
-    jobQueue.add(job);
+    jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
   }
 
   // Job will be removed once the job completes
   @Override
   public void jobRemoved(JobInProgress job) {}
   
-  private void jobCompleted(JobInProgress job) {
-    jobQueue.remove(job);
+  private void jobCompleted(JobSchedulingInfo oldInfo) {
+    jobQueue.remove(oldInfo);
   }
   
   @Override
@@ -91,26 +117,28 @@
       // Check if the ordering of the job has changed
       // For now priority and start-time can change the job ordering
       JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
+      JobSchedulingInfo oldInfo =  
+        new JobSchedulingInfo(statusEvent.getOldStatus());
       if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED 
           || statusEvent.getEventType() == EventType.START_TIME_CHANGED) {
         // Make a priority change
-        reorderJobs(job);
+        reorderJobs(job, oldInfo);
       } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
         // Check if the job is complete
         int runState = statusEvent.getNewStatus().getRunState();
         if (runState == JobStatus.SUCCEEDED
             || runState == JobStatus.FAILED
             || runState == JobStatus.KILLED) {
-          jobCompleted(job);
+          jobCompleted(oldInfo);
         }
       }
     }
   }
   
-  private void reorderJobs(JobInProgress job) {
+  private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo) {
     synchronized (jobQueue) {
-      jobQueue.remove(job);
-      jobQueue.add(job);
+      jobQueue.remove(oldInfo);
+      jobQueue.put(new JobSchedulingInfo(job), job);
     }
   }
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=706461&r1=706460&r2=706461&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java Mon Oct 20 16:51:12 2008
@@ -19,8 +19,12 @@
 package org.apache.hadoop.mapred;
 
 import java.util.ArrayList;
+import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,6 +37,171 @@
 public class TestJobInProgressListener extends TestCase {
   private static final Log LOG = 
     LogFactory.getLog(TestJobInProgressListener.class);
+  private final Path testDir = new Path("test-jip-listener-update");
+  
+  private JobConf configureJob(JobConf conf, int m, int r, 
+                               Path inDir, Path outputDir,
+                               String mapSignalFile, String redSignalFile) 
+  throws IOException {
+    TestJobTrackerRestart.configureWaitingJobConf(conf, inDir, outputDir, 
+                                                  m, r, "job-listener-test", 
+                                                  mapSignalFile, redSignalFile);
+    return conf; 
+  }
+  
+  /**
+   * This test case tests if external updates to JIP do not result into 
+   * undesirable effects
+   * Test is as follows
+   *   - submit 2 jobs of normal priority. job1 is a waiting job which waits and
+   *     blocks the cluster
+   *   - change one parameter of job2 such that the job bumps up in the queue
+   *   - check if the queue looks ok
+   *   
+   */
+  public void testJobQueueChanges() throws IOException {
+    LOG.info("Testing job queue changes");
+    JobConf conf = new JobConf();
+    MiniDFSCluster dfs = new MiniDFSCluster(conf, 1, true, null, null);
+    dfs.waitActive();
+    FileSystem fileSys = dfs.getFileSystem();
+    
+    dfs.startDataNodes(conf, 1, true, null, null, null, null);
+    dfs.waitActive();
+    
+    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
+                      + (dfs.getFileSystem()).getUri().getPort();
+    MiniMRCluster mr = new MiniMRCluster(1, namenode, 1);
+    JobClient jobClient = new JobClient(mr.createJobConf());
+    
+    // clean up
+    fileSys.delete(testDir, true);
+    
+    if (!fileSys.mkdirs(testDir)) {
+      throw new IOException("Mkdirs failed to create " + testDir.toString());
+    }
+
+    // Write the input file
+    Path inDir = new Path(testDir, "input");
+    Path shareDir = new Path(testDir, "share");
+    String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
+    String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
+    TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, 
+                                         new Path(inDir + "/file"), 
+                                         (short)1);
+    
+    JobQueueJobInProgressListener myListener = 
+      new JobQueueJobInProgressListener();
+    
+    // add the listener
+    mr.getJobTrackerRunner().getJobTracker()
+      .addJobInProgressListener(myListener);
+    
+    // big blocking job
+    Path outputDir = new Path(testDir, "output");
+    Path newOutputDir = outputDir.suffix("0");
+    JobConf job1 = configureJob(mr.createJobConf(), 10, 0, inDir, newOutputDir,
+                                mapSignalFile, redSignalFile);
+    
+    // short blocked job
+    newOutputDir = outputDir.suffix("1");
+    JobConf job2 = configureJob(mr.createJobConf(), 1, 0, inDir, newOutputDir,
+                                mapSignalFile, redSignalFile);
+    
+    RunningJob rJob1 = jobClient.submitJob(job1);
+    LOG.info("Running job " + rJob1.getID().toString());
+    
+    RunningJob rJob2 = jobClient.submitJob(job2);
+    LOG.info("Running job " + rJob2.getID().toString());
+    
+    // I. Check job-priority change
+    LOG.info("Testing job priority changes");
+    
+    // bump up job2's priority
+    LOG.info("Increasing job2's priority to HIGH");
+    rJob2.setJobPriority("HIGH");
+    
+    // check if the queue is sane
+    assertTrue("Priority change garbles the queue", 
+               myListener.getJobQueue().size() == 2);
+    
+    JobInProgress[] queue = 
+      myListener.getJobQueue().toArray(new JobInProgress[0]);
+    
+    // check if the bump has happened
+    assertTrue("Priority change failed to bump up job2 in the queue", 
+               queue[0].getJobID().equals(rJob2.getID()));
+    
+    assertTrue("Priority change failed to bump down job1 in the queue", 
+               queue[1].getJobID().equals(rJob1.getID()));
+    
+    assertEquals("Priority change has garbled the queue", 
+                 2, queue.length);
+    
+    // II. Check start-time change
+    LOG.info("Testing job start-time changes");
+    
+    // reset the priority which will make the order as
+    //  - job1
+    //  - job2
+    // this will help in bumping job2 on start-time change
+    LOG.info("Increasing job2's priority to NORMAL"); 
+    rJob2.setJobPriority("NORMAL");
+    
+    // create the change event
+    JobInProgress jip2 = mr.getJobTrackerRunner().getJobTracker()
+                          .getJob(rJob2.getID());
+    JobInProgress jip1 = mr.getJobTrackerRunner().getJobTracker()
+                           .getJob(rJob1.getID());
+    
+    JobStatus prevStatus = (JobStatus)jip2.getStatus().clone();
+    
+    // change job2's start-time and the status
+    jip2.startTime =  jip1.startTime - 1;
+    jip2.status.setStartTime(jip2.startTime);
+    
+    
+    JobStatus newStatus = (JobStatus)jip2.getStatus().clone();
+    
+    // inform the listener
+    LOG.info("Updating the listener about job2's start-time change");
+    JobStatusChangeEvent event = 
+      new JobStatusChangeEvent(jip2, EventType.START_TIME_CHANGED, 
+                              prevStatus, newStatus);
+    myListener.jobUpdated(event);
+    
+    // check if the queue is sane
+    assertTrue("Start time change garbles the queue", 
+               myListener.getJobQueue().size() == 2);
+    
+    queue = myListener.getJobQueue().toArray(new JobInProgress[0]);
+    
+    // check if the bump has happened
+    assertTrue("Start time change failed to bump up job2 in the queue", 
+               queue[0].getJobID().equals(rJob2.getID()));
+    
+    assertTrue("Start time change failed to bump down job1 in the queue", 
+               queue[1].getJobID().equals(rJob1.getID()));
+    
+    assertEquals("Start time change has garbled the queue", 
+                 2, queue.length);
+    
+    // signal the maps to complete
+    TestJobTrackerRestart.signalTasks(dfs, fileSys, true, 
+                                      mapSignalFile, redSignalFile);
+    
+    // check if job completion leaves the queue sane
+    while (rJob2.getJobState() != JobStatus.SUCCEEDED) {
+      TestJobTrackerRestart.waitFor(10);
+    }
+    
+    while (rJob1.getJobState() != JobStatus.SUCCEEDED) {
+      TestJobTrackerRestart.waitFor(10);
+    }
+    
+    assertTrue("Job completion garbles the queue", 
+               myListener.getJobQueue().size() == 0);
+  }
   
   // A listener that inits the tasks one at a time and also listens to the 
   // events

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=706461&r1=706460&r2=706461&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Mon Oct 20 16:51:12 2008
@@ -42,8 +42,9 @@
       super(new JobID("test", ++jobCounter), jobConf);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
-      this.status = new JobStatus();
-      this.status.setRunState(JobStatus.PREP);
+      this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
+      this.status.setJobPriority(JobPriority.NORMAL);
+      this.status.setStartTime(startTime);
     }
     
     @Override