You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/12/10 05:22:28 UTC

svn commit: r724968 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: yhemanth
Date: Tue Dec  9 20:22:27 2008
New Revision: 724968

URL: http://svn.apache.org/viewvc?rev=724968&view=rev
Log:
HADOOP-4731. Fix capacity scheduler to correctly remove job on completion from waiting queue. Contributed by Amar Kamat.

Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=724968&r1=724967&r2=724968&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Tue Dec  9 20:22:27 2008
@@ -24,6 +24,9 @@
     HADOOP-4727. Fix a group checking bug in fill_stat_structure(...) in
     fuse-dfs.  (Brian Bockelman via szetszwo)
 
+    HADOOP-4731. Fix capacity scheduler to correctly remove job on completion 
+    from waiting queue. (Amar Kamat via yhemanth)
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=724968&r1=724967&r2=724968&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Tue Dec  9 20:22:27 2008
@@ -143,7 +143,7 @@
     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
              + job.getProfile().getQueueName() + " has completed");
     // job could be in running or waiting queue
-    if (qi.runningJobs.remove(oldInfo) != null) {
+    if (qi.runningJobs.remove(oldInfo) == null) {
       qi.waitingJobs.remove(oldInfo);
     }
     // let scheduler know

Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=724968&r1=724967&r2=724968&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Dec  9 20:22:27 2008
@@ -139,6 +139,11 @@
     Set<TaskInProgress> getRunningReduces() {
       return (Set<TaskInProgress>)reduceTips;
     }
+    
+    @Override
+    public void kill() {
+      this.status.setRunState(JobStatus.KILLED);
+    }
   }
   
   static class FakeTaskInProgress extends TaskInProgress {
@@ -480,6 +485,63 @@
                                     oldStatus, newStatus);
   }
   
+  // test job completion
+  // test if the job completion/killing is reflected while the job is in
+  //   - prep
+  //   - running
+  public void testJobCompletion() throws IOException {
+    // start the scheduler
+    taskTrackerManager.addQueues(new String[] {"default"});
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    
+    // submit the job
+    FakeJobInProgress fjob1 = 
+      submitJob(JobStatus.PREP, 1, 0, "default", "user");
+    
+    // submit another job
+    FakeJobInProgress fjob2 = 
+      submitJob(JobStatus.PREP, 1, 0, "default", "user");
+    
+    // kill the first job 
+    fjob1.kill();
+    taskTrackerManager.finalizeJob(fjob1);
+    
+    // check if the state change is reflected
+    assertEquals("Waiting queue is garbled on waiting job-kill", 1, 
+                 scheduler.jobQueuesManager.getWaitingJobQueue("default")
+                          .size());
+    
+    // Init the other job
+    JobChangeEvent event = initTasksAndReportEvent(fjob2);
+    
+    // inform the scheduler
+    scheduler.jobQueuesManager.jobUpdated(event);
+    
+    // schedule a task
+    List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+    
+    // complete the job
+    taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(), 
+                                  fjob2);
+    
+    // kill the job now
+    fjob2.kill();
+    
+    // mark the job as complete
+    taskTrackerManager.finalizeJob(fjob2);
+    
+    // check if the state change has not changed running queue
+    assertEquals("Runnning queue is garbled on running job-kill", 0, 
+                 scheduler.jobQueuesManager.getRunningJobQueue("default")
+                          .size());
+    
+  }
+  
   // test job run-state change
   public void testJobRunStateChange() throws IOException {
     // start the scheduler