You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/10/23 13:15:36 UTC

svn commit: r707341 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java

Author: ddas
Date: Thu Oct 23 04:15:35 2008
New Revision: 707341

URL: http://svn.apache.org/viewvc?rev=707341&view=rev
Log:
HADOOP-4440.  TestJobInProgressListener tests for jobs killed in queued state. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=707341&r1=707340&r2=707341&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Oct 23 04:15:35 2008
@@ -65,6 +65,9 @@
 
     HADOOP-4408. FsAction functions need not create new objects. (cdouglas)
 
+    HADOOP-4440.  TestJobInProgressListener tests for jobs killed in queued 
+    state (Amar Kamat via ddas)
+
 Release 0.19.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=707341&r1=707340&r2=707341&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Thu Oct 23 04:15:35 2008
@@ -33,7 +33,7 @@
   private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
   
   protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
-  private EagerTaskInitializationListener eagerTaskInitializationListener;
+  protected EagerTaskInitializationListener eagerTaskInitializationListener;
   private float padFraction;
   
   public JobQueueTaskScheduler() {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=707341&r1=707340&r2=707341&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java Thu Oct 23 04:15:35 2008
@@ -242,9 +242,13 @@
           // RUNNING->COMPLETE(SUCCESS/KILLED/FAILED)
           JobInProgress jip = event.getJobInProgress();
           String jobId = jip.getJobID().toString();
-          if (statusEvent.getJobInProgress().isComplete()) {
+          if (jip.isComplete()) {
             LOG.info("Job " +  jobId + " deleted from the running queue");
-            jobs.remove(jip);
+            if (statusEvent.getOldStatus().getRunState() == JobStatus.PREP) {
+              wjobs.remove(jip);
+            } else {
+              jobs.remove(jip);
+            }
           } else {
             // PREP->RUNNING
             LOG.info("Job " +  jobId + " deleted from the waiting queue");
@@ -329,4 +333,58 @@
     assertFalse("Missing event notification for a successful job", 
                 myListener.contains(rJob.getID(), false));
   }
-}
\ No newline at end of file
+  
+  /**
+   * This scheduler never schedules any task as it doesnt init any task. So all
+   * the jobs are queued forever.
+   */
+  public static class MyScheduler extends JobQueueTaskScheduler {
+
+    @Override
+    public synchronized void start() throws IOException {
+      super.start();
+      // Remove the eager task initializer
+      taskTrackerManager.removeJobInProgressListener(
+          eagerTaskInitializationListener);
+      // terminate it
+      eagerTaskInitializationListener.terminate();
+    }
+  }
+  
+  public void testQueuedJobKill() throws Exception {
+    LOG.info("Testing queued-job-kill");
+    
+    MyListener myListener = new MyListener();
+    
+    JobConf job = new JobConf();
+    job.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
+                 TaskScheduler.class);
+    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, job);
+    
+    job = mr.createJobConf();
+    
+    mr.getJobTrackerRunner().getJobTracker()
+      .addJobInProgressListener(myListener);
+    
+    RunningJob rJob = TestJobKillAndFail.runJob(job);
+    JobID id = rJob.getID();
+    LOG.info("Job : " + id.toString() + " submitted");
+    
+    // check if the job is in the waiting queue
+    assertTrue("Missing event notification on submiting a job", 
+                myListener.contains(id, true));
+    
+    // kill the job
+    LOG.info("Killing job : " + id.toString());
+    rJob.killJob();
+    
+    // check if the job is killed
+    assertEquals("Job status doesnt reflect the kill-job action", 
+                 JobStatus.KILLED, rJob.getJobState());
+
+    // check if the job is correctly moved
+    // from the waiting list
+    assertFalse("Missing event notification on killing a waiting job", 
+                myListener.contains(id, true));
+  }
+}