You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/11/13 14:59:05 UTC

svn commit: r713723 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: yhemanth
Date: Thu Nov 13 05:59:04 2008
New Revision: 713723

URL: http://svn.apache.org/viewvc?rev=713723&view=rev
Log:
HADOOP-4471. Sort running jobs by priority in the capacity scheduler. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=713723&r1=713722&r2=713723&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Nov 13 05:59:04 2008
@@ -1118,7 +1118,10 @@
     HADOOP-4595. Fixes two race conditions - one to do with updating free slot count,
     and another to do with starting the MapEventsFetcher thread. (ddas)
 
-    HADOOP-4552. Fix a deadlock in RPC server. (Raghu Angadi) 
+    HADOOP-4552. Fix a deadlock in RPC server. (Raghu Angadi)
+
+    HADOOP-4471. Sort running jobs by priority in the capacity scheduler.
+    (Amar Kamat via yhemanth) 
 
 Release 0.18.3 - Unreleased
 

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=713723&r1=713722&r2=713723&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Thu Nov 13 05:59:04 2008
@@ -20,7 +20,6 @@
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -36,13 +35,11 @@
 class JobQueuesManager extends JobInProgressListener {
 
   /* 
-   * If a queue supports priorities, waiting jobs must be 
+   * If a queue supports priorities, jobs must be 
    * sorted on priorities, and then on their start times (technically, 
    * their insertion time.  
-   * If a queue doesn't support priorities, waiting jobs are
+   * If a queue doesn't support priorities, jobs are
    * sorted based on their start time.  
-   * Running jobs are not sorted. A job that started running earlier
-   * is ahead in the queue, so insertion should be at the tail.
    */
   
   // comparator for jobs in queues that don't support priorities
@@ -65,12 +62,8 @@
 
     // whether the queue supports priorities
     boolean supportsPriorities;
-    // maintain separate structures for running & waiting jobs. This we do 
-    // mainly because when a new job is added, it cannot superceede a running 
-    // job, even though the latter may be a lower priority. If this is ever
-    // changed, we may get by with one collection. 
-    Map<JobSchedulingInfo, JobInProgress> waitingJobs;
-    Collection<JobInProgress> runningJobs;
+    Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
+    Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
     
     QueueInfo(boolean prio) {
       this.supportsPriorities = prio;
@@ -79,12 +72,16 @@
         this.waitingJobs = 
           new TreeMap<JobSchedulingInfo, JobInProgress>(
               JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
+        this.runningJobs = 
+          new TreeMap<JobSchedulingInfo, JobInProgress>(
+              JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
       }
       else {
         this.waitingJobs = 
           new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
+        this.runningJobs = 
+          new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
       }
-      this.runningJobs = new LinkedList<JobInProgress>();
     }
   }
   
@@ -112,7 +109,7 @@
    * Returns the queue of running jobs associated with the name
    */
   public Collection<JobInProgress> getRunningJobQueue(String queueName) {
-    return jobQueues.get(queueName).runningJobs;
+    return jobQueues.get(queueName).runningJobs.values();
   }
   
   /**
@@ -146,7 +143,7 @@
     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
              + job.getProfile().getQueueName() + " has completed");
     // job could be in running or waiting queue
-    if (!qi.runningJobs.remove(job)) {
+    if (qi.runningJobs.remove(oldInfo) != null) {
       qi.waitingJobs.remove(oldInfo);
     }
     // let scheduler know
@@ -162,13 +159,14 @@
   private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
                            QueueInfo qi) {
     
+    JobSchedulingInfo newInfo = new JobSchedulingInfo(job);
     if (qi.waitingJobs.remove(oldInfo) == null) {
-      qi.runningJobs.remove(job);
+      qi.runningJobs.remove(oldInfo);
       // Add back to the running queue
-      qi.runningJobs.add(job);
+      qi.runningJobs.put(newInfo, job);
     } else {
       // Add back to the waiting queue
-      qi.waitingJobs.put(new JobSchedulingInfo(job), job);
+      qi.waitingJobs.put(newInfo, job);
     }
   }
   
@@ -179,7 +177,7 @@
     qi.waitingJobs.remove(oldInfo);
     
     // Add the job to the running queue
-    qi.runningJobs.add(job);
+    qi.runningJobs.put(new JobSchedulingInfo(job), job);
   }
   
   // Update the scheduler as job's state has changed

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=713723&r1=713722&r2=713723&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Nov 13 05:59:04 2008
@@ -501,72 +501,34 @@
     FakeJobInProgress fjob2 = 
       submitJob(JobStatus.PREP, 1, 0, "default", "user");
     
-    // check if the job is in the waiting queue
-    JobInProgress[] jobs = 
-      scheduler.jobQueuesManager.getWaitingJobQueue("default")
-               .toArray(new JobInProgress[0]);
-    assertTrue("Waiting queue doesnt contain queued job #1 in right order", 
-                jobs[0].getJobID().equals(fjob1.getJobID()));
-    assertTrue("Waiting queue doesnt contain queued job #2 in right order", 
-                jobs[1].getJobID().equals(fjob2.getJobID()));
-    
-    // I. Check the start-time change
-    // Change job2 start-time and check if job2 bumps up in the queue 
-    taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
-    
-    jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default")
-                    .toArray(new JobInProgress[0]);
-    assertTrue("Start time change didnt not work as expected for job #2", 
-                jobs[0].getJobID().equals(fjob2.getJobID()));
-    assertTrue("Start time change didnt not work as expected for job #1", 
-                jobs[1].getJobID().equals(fjob1.getJobID()));
-    
-    // check if the queue is fine
-    assertEquals("Start-time change garbled the waiting queue", 
-                 2, scheduler.getJobs("default").size());
-    
-    // II. Change job priority change
-    // Bump up job1's priority and make sure job1 bumps up in the queue
-    taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
-    
-    // Check if the priority changes are reflected
-    jobs = scheduler.jobQueuesManager.getWaitingJobQueue("default")
-                    .toArray(new JobInProgress[0]);
-    assertTrue("Priority change didnt not work as expected for job #1", 
-                jobs[0].getJobID().equals(fjob1.getJobID()));
-    assertTrue("Priority change didnt not work as expected for job #2", 
-                jobs[1].getJobID().equals(fjob2.getJobID()));
+    // test if changing the job priority/start-time works as expected in the 
+    // waiting queue
+    testJobOrderChange(fjob1, fjob2, true);
     
-    // check if the queue is fine
-    assertEquals("Priority change has garbled the waiting queue", 
-                 2, scheduler.getJobs("default").size());
+    // Init the jobs
+    // simulate the case where the job with a lower priority becomes running 
+    // first (may be because of the setup tasks).
     
-    // Create an event
-    JobChangeEvent event = initTasksAndReportEvent(fjob1);
+    // init the lower ranked job first
+    JobChangeEvent event = initTasksAndReportEvent(fjob2);
     
     // inform the scheduler
     scheduler.jobQueuesManager.jobUpdated(event);
     
-    // waiting queue
-    Collection<JobInProgress> wqueue = 
-      scheduler.jobQueuesManager.getWaitingJobQueue("default");
-    
-    // check if the job is not in the waiting queue
-    assertFalse("Waiting queue contains running/inited job", 
-                wqueue.contains(fjob1));
+    // init the higher ordered job later
+    event = initTasksAndReportEvent(fjob1);
     
-    // check if the waiting queue is fine
-    assertEquals("Waiting queue is garbled on job init", 1, wqueue.size());
-    
-    Collection<JobInProgress> rqueue = 
-      scheduler.jobQueuesManager.getRunningJobQueue("default");
-    
-    // check if the job is in the running queue
-    assertTrue("Running queue doesnt contain running/inited job", 
-                rqueue.contains(fjob1));
+    // inform the scheduler
+    scheduler.jobQueuesManager.jobUpdated(event);
     
-    // check if the running queue is fine
-    assertEquals("Running queue is garbled upon init", 1, rqueue.size());
+    // check if the jobs are missing from the waiting queue
+    assertEquals("Waiting queue is garbled on job init", 0, 
+                 scheduler.jobQueuesManager.getWaitingJobQueue("default")
+                          .size());
+    
+    // test if changing the job priority/start-time works as expected in the 
+    // running queue
+    testJobOrderChange(fjob1, fjob2, false);
     
     // schedule a task
     List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
@@ -578,7 +540,8 @@
     // mark the job as complete
     taskTrackerManager.finalizeJob(fjob1);
     
-    rqueue = scheduler.jobQueuesManager.getRunningJobQueue("default");
+    Collection<JobInProgress> rqueue = 
+      scheduler.jobQueuesManager.getRunningJobQueue("default");
     
     // check if the job is removed from the scheduler
     assertFalse("Scheduler contains completed job", 
@@ -586,8 +549,67 @@
     
     // check if the running queue size is correct
     assertEquals("Job finish garbles the queue", 
-                 0, rqueue.size());
+                 1, rqueue.size());
+    
+  }
+  
+  // test if the queue reflects the changes
+  private void testJobOrderChange(FakeJobInProgress fjob1, 
+                                  FakeJobInProgress fjob2, 
+                                  boolean waiting) {
+    String queueName = waiting ? "waiting" : "running";
+    
+    // check if the jobs in the queue are the right order
+    JobInProgress[] jobs = getJobsInQueue(waiting);
+    assertTrue(queueName + " queue doesnt contain job #1 in right order", 
+                jobs[0].getJobID().equals(fjob1.getJobID()));
+    assertTrue(queueName + " queue doesnt contain job #2 in right order", 
+                jobs[1].getJobID().equals(fjob2.getJobID()));
+    
+    // I. Check the start-time change
+    // Change job2 start-time and check if job2 bumps up in the queue 
+    taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
+    
+    jobs = getJobsInQueue(waiting);
+    assertTrue("Start time change didnt not work as expected for job #2 in "
+               + queueName + " queue", 
+                jobs[0].getJobID().equals(fjob2.getJobID()));
+    assertTrue("Start time change didnt not work as expected for job #1 in"
+               + queueName + " queue", 
+                jobs[1].getJobID().equals(fjob1.getJobID()));
     
+    // check if the queue is fine
+    assertEquals("Start-time change garbled the " + queueName + " queue", 
+                 2, jobs.length);
+    
+    // II. Change job priority change
+    // Bump up job1's priority and make sure job1 bumps up in the queue
+    taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
+    
+    // Check if the priority changes are reflected
+    jobs = getJobsInQueue(waiting);
+    assertTrue("Priority change didnt not work as expected for job #1 in "
+               + queueName + " queue",  
+                jobs[0].getJobID().equals(fjob1.getJobID()));
+    assertTrue("Priority change didnt not work as expected for job #2 in "
+               + queueName + " queue",  
+                jobs[1].getJobID().equals(fjob2.getJobID()));
+    
+    // check if the queue is fine
+    assertEquals("Priority change has garbled the " + queueName + " queue", 
+                 2, jobs.length);
+    
+    // reset the queue state back to normal
+    taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
+    taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
+  }
+  
+  private JobInProgress[] getJobsInQueue(boolean waiting) {
+    Collection<JobInProgress> queue = 
+      waiting 
+      ? scheduler.jobQueuesManager.getWaitingJobQueue("default")
+      : scheduler.jobQueuesManager.getRunningJobQueue("default");
+    return queue.toArray(new JobInProgress[0]);
   }
   
   /*protected void submitJobs(int number, int state, int maps, int reduces)