You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/10/15 20:39:46 UTC

svn commit: r704990 - in /hadoop/core/branches/branch-0.19: ./ CHANGES.txt src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobTracker.java

Author: acmurthy
Date: Wed Oct 15 11:39:46 2008
New Revision: 704990

URL: http://svn.apache.org/viewvc?rev=704990&view=rev
Log:
Merge -r 704988:704989 from trunk to branch-0.19 to fix HADOOP-4236.

Modified:
    hadoop/core/branches/branch-0.19/   (props changed)
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Propchange: hadoop/core/branches/branch-0.19/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 15 11:39:46 2008
@@ -1 +1 @@
-/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748
+/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=704990&r1=704989&r2=704990&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Wed Oct 15 11:39:46 2008
@@ -867,6 +867,9 @@
     determine whether to canonicalize file paths or not.
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-4236. Ensure un-initialized jobs are killed correctly on
+    user-demand. (Sharad Agarwal via acmurthy) 
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=704990&r1=704989&r2=704990&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Wed Oct 15 11:39:46 2008
@@ -58,7 +58,7 @@
           LOG.error("Job initialization failed:\n" +
                     StringUtils.stringifyException(t));
           if (job != null) {
-            job.terminateJob(JobStatus.FAILED);
+            job.fail();
           }
         }
       }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=704990&r1=704989&r2=704990&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 15 11:39:46 2008
@@ -142,6 +142,7 @@
 
   private JobConf conf;
   AtomicBoolean tasksInited = new AtomicBoolean(false);
+  private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
 
   private LocalFileSystem localFs;
   private JobID jobId;
@@ -340,6 +341,12 @@
     if (tasksInited.get()) {
       return;
     }
+    synchronized(jobInitKillStatus){
+      if(jobInitKillStatus.killed) {
+        return;
+      }
+      jobInitKillStatus.initStarted = true;
+    }
 
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
@@ -450,6 +457,15 @@
                        numReduceTasks + 1, jobtracker, conf, this);
     setup[1].setSetupTask();
     
+    synchronized(jobInitKillStatus){
+      jobInitKillStatus.initDone = true;
+      if(jobInitKillStatus.killed) {
+        //setup not launched so directly terminate
+        terminateJob(JobStatus.KILLED);
+        return;
+      }
+    }
+    
     tasksInited.set(true);
     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
                                  numMapTasks, numReduceTasks);
@@ -881,40 +897,47 @@
    * Return a CleanupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public synchronized Task obtainCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainCleanupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    if (!canLaunchCleanupTask()) {
-      return null;
-    }
-    
-    String taskTracker = tts.getTrackerName();
-    // Update the last-known clusterSize
-    this.clusterSize = clusterSize;
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if(!tasksInited.get()) {
       return null;
     }
     
-    List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
-    if (isMapSlot) {
-      cleanupTaskList.add(cleanup[0]);
-    } else {
-      cleanupTaskList.add(cleanup[1]);
-    }
-    TaskInProgress tip = findTaskFromList(cleanupTaskList,
-                           tts, numUniqueHosts, false);
-    if (tip == null) {
-      return null;
+    synchronized(this) {
+      if (!canLaunchCleanupTask()) {
+        return null;
+      }
+      
+      String taskTracker = tts.getTrackerName();
+      // Update the last-known clusterSize
+      this.clusterSize = clusterSize;
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
+      }
+      
+      List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
+      if (isMapSlot) {
+        cleanupTaskList.add(cleanup[0]);
+      } else {
+        cleanupTaskList.add(cleanup[1]);
+      }
+      TaskInProgress tip = findTaskFromList(cleanupTaskList,
+                             tts, numUniqueHosts, false);
+      if (tip == null) {
+        return null;
+      }
+      
+      // Now launch the cleanupTask
+      Task result = tip.getTaskToRun(tts.getTrackerName());
+      if (result != null) {
+        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+      }
+      return result;
     }
     
-    // Now launch the cleanupTask
-    Task result = tip.getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
-    }
-    return result;
   }
   
   /**
@@ -956,40 +979,46 @@
    * Return a SetupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public synchronized Task obtainSetupTask(TaskTrackerStatus tts, 
+  public Task obtainSetupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    if (!canLaunchSetupTask()) {
+    if(!tasksInited.get()) {
       return null;
     }
     
-    String taskTracker = tts.getTrackerName();
-    // Update the last-known clusterSize
-    this.clusterSize = clusterSize;
-    if (!shouldRunOnTaskTracker(taskTracker)) {
-      return null;
-    }
-    
-    List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
-    if (isMapSlot) {
-      setupTaskList.add(setup[0]);
-    } else {
-      setupTaskList.add(setup[1]);
-    }
-    TaskInProgress tip = findTaskFromList(setupTaskList,
-                           tts, numUniqueHosts, false);
-    if (tip == null) {
-      return null;
-    }
-    
-    // Now launch the setupTask
-    Task result = tip.getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+    synchronized(this) {
+      if (!canLaunchSetupTask()) {
+        return null;
+      }
+      
+      String taskTracker = tts.getTrackerName();
+      // Update the last-known clusterSize
+      this.clusterSize = clusterSize;
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
+      }
+      
+      List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
+      if (isMapSlot) {
+        setupTaskList.add(setup[0]);
+      } else {
+        setupTaskList.add(setup[1]);
+      }
+      TaskInProgress tip = findTaskFromList(setupTaskList,
+                             tts, numUniqueHosts, false);
+      if (tip == null) {
+        return null;
+      }
+      
+      // Now launch the setupTask
+      Task result = tip.getTaskToRun(tts.getTrackerName());
+      if (result != null) {
+        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+      }
+      return result;
     }
-    return result;
   }
   
   /**
@@ -1883,7 +1912,7 @@
     }
   }
   
-  synchronized void terminateJob(int jobTerminationState) {
+  private synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
       if (jobTerminationState == JobStatus.FAILED) {
@@ -1909,12 +1938,33 @@
 
   /**
    * Terminate the job and all its component tasks.
+   * Calling this will lead to marking the job as failed/killed. Cleanup 
+   * tip will be launched. If the job has not inited, it will directly call 
+   * terminateJob as there is no need to launch cleanup tip.
+   * This method is reentrant.
    * @param jobTerminationState job termination state
    */
   private synchronized void terminate(int jobTerminationState) {
+    if(!tasksInited.get()) {
+    	//init could not be done, we just terminate directly.
+      terminateJob(jobTerminationState);
+      return;
+    }
+
     if ((status.getRunState() == JobStatus.RUNNING) ||
          (status.getRunState() == JobStatus.PREP)) {
       LOG.info("Killing job '" + this.status.getJobID() + "'");
+      if (jobTerminationState == JobStatus.FAILED) {
+        if(jobFailed) {//reentrant
+          return;
+        }
+        jobFailed = true;
+      } else if (jobTerminationState == JobStatus.KILLED) {
+        if(jobKilled) {//reentrant
+          return;
+        }
+        jobKilled = true;
+      }
       //
       // kill all TIPs.
       //
@@ -1927,19 +1977,29 @@
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
-      if (jobTerminationState == JobStatus.FAILED) {
-        jobFailed = true;
-      } else if (jobTerminationState == JobStatus.KILLED) {
-        jobKilled = true;
-      }
     }
   }
   
   /**
-   * Kill the job and all its component tasks.
+   * Kill the job and all its component tasks. This method is called from 
+   * jobtracker and should return fast as it locks the jobtracker.
    */
-  public synchronized void kill() {
-    terminate(JobStatus.KILLED);
+  public void kill() {
+    boolean killNow = false;
+    synchronized(jobInitKillStatus) {
+      if(jobInitKillStatus.killed) {//job is already marked for killing
+        return;
+      }
+      jobInitKillStatus.killed = true;
+      //if not in middle of init, terminate it now
+      if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
+        //avoiding nested locking by setting flag
+        killNow = true;
+      }
+    }
+    if(killNow) {
+      terminate(JobStatus.KILLED);
+    }
   }
   
   /**
@@ -2312,4 +2372,17 @@
     this.schedulingInfo = schedulingInfo;
     this.status.setSchedulingInfo(schedulingInfo.toString());
   }
+  
+  /**
+   * To keep track of kill and initTasks status of this job. initTasks() take 
+   * a lock on JobInProgress object. kill should avoid waiting on 
+   * JobInProgress lock since it may take a while to do initTasks().
+   */
+  private static class JobInitKillStatus {
+    //flag to be set if kill is called
+    boolean killed;
+    
+    boolean initStarted;
+    boolean initDone;
+  }
 }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=704990&r1=704989&r2=704990&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Oct 15 11:39:46 2008
@@ -2264,9 +2264,7 @@
   public synchronized void killJob(JobID jobid) throws IOException {
     JobInProgress job = jobs.get(jobid);
     checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
-    if (job.inited()) {
-      job.kill();
-    }
+    job.kill();
   }
 
   /**