You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/20 11:18:50 UTC

svn commit: r746193 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: ddas
Date: Fri Feb 20 10:18:50 2009
New Revision: 746193

URL: http://svn.apache.org/viewvc?rev=746193&view=rev
Log:
HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN or KILLED_UNCLEAN tasks forever. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Feb 20 10:18:50 2009
@@ -801,6 +801,9 @@
     HADOOP-5255. Fix use of Math.abs to avoid overflow. (Jonathan Ellis via
     cdouglas)
 
+    HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN
+    or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas) 
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java Fri Feb 20 10:18:50 2009
@@ -169,9 +169,13 @@
       umbilical.fsError(taskid, e.getMessage());
     } catch (Throwable throwable) {
       LOG.warn("Error running child", throwable);
-      if (task != null) {
-        // do cleanup for the task
-        task.taskCleanup(umbilical);
+      try {
+        if (task != null) {
+          // do cleanup for the task
+          task.taskCleanup(umbilical);
+        }
+      } catch (Throwable th) {
+        LOG.info("Error cleaning up" + th);
       }
       // Report back any failures, for diagnostic purposes
       ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java Fri Feb 20 10:18:50 2009
@@ -129,7 +129,7 @@
     }
   }
 
-  public void abortTask(TaskAttemptContext context) {
+  public void abortTask(TaskAttemptContext context) throws IOException {
     Path taskOutputPath =  getTempTaskOutputPath(context);
     try {
       if (taskOutputPath != null) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Feb 20 10:18:50 2009
@@ -2130,6 +2130,34 @@
       releaseSlot();
     }
 
+    /* State changes:
+     * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED/KILLED_UNCLEAN/KILLED
+     * FAILED_UNCLEAN -> FAILED
+     * KILLED_UNCLEAN -> KILLED 
+     */
+    private void setTaskFailState(boolean wasFailure) {
+      // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+      if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+        taskStatus.setRunState(TaskStatus.State.FAILED);
+      } else if (taskStatus.getRunState() == 
+                 TaskStatus.State.KILLED_UNCLEAN) {
+        taskStatus.setRunState(TaskStatus.State.KILLED);
+      } else if (task.isMapOrReduce() && 
+                 taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+        if (wasFailure) {
+          taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+        } else {
+          taskStatus.setRunState(TaskStatus.State.KILLED_UNCLEAN);
+        }
+      } else {
+        if (wasFailure) {
+          taskStatus.setRunState(TaskStatus.State.FAILED);
+        } else {
+          taskStatus.setRunState(TaskStatus.State.KILLED);
+        }
+      }
+    }
+    
     /**
      * The task has actually finished running.
      */
@@ -2156,22 +2184,7 @@
         if (!done) {
           if (!wasKilled) {
             failures += 1;
-            /* State changes:
-             * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
-             * FAILED_UNCLEAN -> FAILED 
-             * KILLED_UNCLEAN -> KILLED 
-             */
-            if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
-              taskStatus.setRunState(TaskStatus.State.FAILED);
-            } else if (taskStatus.getRunState() == 
-                       TaskStatus.State.KILLED_UNCLEAN) {
-              taskStatus.setRunState(TaskStatus.State.KILLED);
-            } else if (task.isMapOrReduce() && 
-                       taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
-              taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
-            } else {
-              taskStatus.setRunState(TaskStatus.State.FAILED);
-            }
+            setTaskFailState(true);
             removeFromMemoryManager(task.getTaskID());
             // call the script here for the failed tasks.
             if (debugCommand != null) {
@@ -2384,20 +2397,9 @@
     /**
      * Something went wrong and the task must be killed.
      * 
-     * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
-     * FAILED_UNCLEAN -> FAILED 
-     * KILLED_UNCLEAN -> KILLED
-     * UNASSIGNED -> FAILED/KILLED
      * @param wasFailure was it a failure (versus a kill request)?
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
-      /* State changes:
-       * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
-       * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
-       * FAILED_UNCLEAN -> FAILED 
-       * KILLED_UNCLEAN -> KILLED
-       * UNASSIGNED -> FAILED/KILLED 
-       */
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
           taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
           isCleaningup()) {
@@ -2406,23 +2408,7 @@
           failures += 1;
         }
         runner.kill();
-        if (task.isMapOrReduce()) {
-          taskStatus.setRunState((wasFailure) ? 
-                                    TaskStatus.State.FAILED_UNCLEAN : 
-                                    TaskStatus.State.KILLED_UNCLEAN);
-        } else {
-          // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
-          if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
-            taskStatus.setRunState(TaskStatus.State.FAILED);
-          } else if (taskStatus.getRunState() == 
-                     TaskStatus.State.KILLED_UNCLEAN) {
-            taskStatus.setRunState(TaskStatus.State.KILLED);
-          } else {
-            taskStatus.setRunState((wasFailure) ? 
-                                      TaskStatus.State.FAILED : 
-                                      TaskStatus.State.KILLED);
-          }
-        }
+        setTaskFailState(wasFailure);
       } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
           failures += 1;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java Fri Feb 20 10:18:50 2009
@@ -49,6 +49,18 @@
     }
   }
 
+  static class CommitterWithFailTaskCleanup extends FileOutputCommitter {
+    public void abortTask(TaskAttemptContext context) throws IOException {
+      System.exit(-1);
+    }
+  }
+
+  static class CommitterWithFailTaskCleanup2 extends FileOutputCommitter {
+    public void abortTask(TaskAttemptContext context) throws IOException {
+      throw new IOException();
+    }
+  }
+
   public RunningJob launchJob(JobConf conf,
                               Path inDir,
                               Path outDir,
@@ -79,7 +91,34 @@
     // return the RunningJob handle.
     return new JobClient(conf).submitJob(conf);
   }
-		  
+  
+  private void validateJob(RunningJob job, MiniMRCluster mr) 
+  throws IOException {
+    assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+	    
+    JobID jobId = job.getID();
+    // construct the task id of first map task
+    TaskAttemptID attemptId = 
+      new TaskAttemptID(new TaskID(jobId, true, 0), 0);
+    TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+                            getTip(attemptId.getTaskID());
+    // this should not be cleanup attempt since the first attempt 
+    // fails with an exception
+    assertTrue(!tip.isCleanupAttempt(attemptId));
+    TaskStatus ts = 
+      mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+    assertTrue(ts != null);
+    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+    
+    attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 1);
+    // this should be cleanup attempt since the second attempt fails
+    // with System.exit
+    assertTrue(tip.isCleanupAttempt(attemptId));
+    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+    assertTrue(ts != null);
+    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+  }
+  
   public void testWithDFS() throws IOException {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
@@ -91,39 +130,25 @@
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
-      JobConf jobConf = mr.createJobConf();
       final Path inDir = new Path("./input");
       final Path outDir = new Path("./output");
       String input = "The quick brown fox\nhas many silly\nred fox sox\n";
-      RunningJob job = null;
-
-      job = launchJob(jobConf, inDir, outDir, input);
-      // wait for the job to finish.
-      while (!job.isComplete());
-      assertEquals(JobStatus.SUCCEEDED, job.getJobState());
-      
-      JobID jobId = job.getID();
-      // construct the task id of first map task
-      TaskAttemptID attemptId = 
-        new TaskAttemptID(new TaskID(jobId, true, 0), 0);
-      TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
-                              getTip(attemptId.getTaskID());
-      // this should not be cleanup attempt since the first attempt 
-      // fails with an exception
-      assertTrue(!tip.isCleanupAttempt(attemptId));
-      TaskStatus ts = 
-        mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-      assertTrue(ts != null);
-      assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-      
-      attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 1);
-      // this should be cleanup attempt since the second attempt fails
-      // with System.exit
-      assertTrue(tip.isCleanupAttempt(attemptId));
-      ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-      assertTrue(ts != null);
-      assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-
+      // launch job with fail tasks
+      RunningJob rJob = launchJob(mr.createJobConf(), inDir, outDir, input);
+      rJob.waitForCompletion();
+      validateJob(rJob, mr);
+      // launch job with fail tasks and fail-cleanups
+      JobConf jobConf = mr.createJobConf();
+      fileSys.delete(outDir, true);
+      jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
+      rJob = launchJob(jobConf, inDir, outDir, input);
+      rJob.waitForCompletion();
+      validateJob(rJob, mr);
+      fileSys.delete(outDir, true);
+      jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
+      rJob = launchJob(jobConf, inDir, outDir, input);
+      rJob.waitForCompletion();
+      validateJob(rJob, mr);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown(); }