You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/08/26 18:11:48 UTC

svn commit: r808087 - in /hadoop/common/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/ src/test/ src/test/org/apache/hadoop/mapred/

Author: sharad
Date: Wed Aug 26 16:11:48 2009
New Revision: 808087

URL: http://svn.apache.org/viewvc?rev=808087&view=rev
Log:
MAPREDUCE-430. Fix a bug related to task getting stuck in case of OOM error. Contributed by Amar Kamat.

Modified:
    hadoop/common/branches/branch-0.20/CHANGES.txt
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java

Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Wed Aug 26 16:11:48 2009
@@ -238,6 +238,9 @@
     HADOOP-6213. Remove commons dependency on commons-cli2. (Amar Kamat via
     sharad)
 
+    MAPREDUCE-430. Fix a bug related to task getting stuck in case of 
+    OOM error. (Amar Kamat via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java Wed Aug 26 16:11:48 2009
@@ -183,22 +183,32 @@
     } catch (FSError e) {
       LOG.fatal("FSError from child", e);
       umbilical.fsError(taskid, e.getMessage());
-    } catch (Throwable throwable) {
-      LOG.warn("Error running child", throwable);
+    } catch (Exception exception) {
+      LOG.warn("Error running child", exception);
       try {
         if (task != null) {
           // do cleanup for the task
           task.taskCleanup(umbilical);
         }
-      } catch (Throwable th) {
-        LOG.info("Error cleaning up" + th);
+      } catch (Exception e) {
+        LOG.info("Error cleaning up" + e);
       }
       // Report back any failures, for diagnostic purposes
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      throwable.printStackTrace(new PrintStream(baos));
+      exception.printStackTrace(new PrintStream(baos));
       if (taskid != null) {
         umbilical.reportDiagnosticInfo(taskid, baos.toString());
       }
+    } catch (Throwable throwable) {
+      LOG.fatal("Error running child : "
+                + StringUtils.stringifyException(throwable));
+      if (taskid != null) {
+        Throwable tCause = throwable.getCause();
+        String cause = tCause == null 
+                       ? throwable.getMessage() 
+                       : StringUtils.stringifyException(tCause);
+        umbilical.fatalError(taskid, cause);
+      }
     } finally {
       RPC.stopProxy(umbilical);
       MetricsContext metricsContext = MetricsUtil.getContext("mapred");

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Wed Aug 26 16:11:48 2009
@@ -59,6 +59,10 @@
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
+    public void fatalError(TaskAttemptID taskId, String msg) throws IOException{
+      LOG.info("Task " + taskId + " reporting fatal error: " + msg);
+    }
+
     public JvmTask getTask(JVMId jvmId) throws IOException {
       return null;
     }

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Wed Aug 26 16:11:48 2009
@@ -342,6 +342,11 @@
       LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
     }
     
+    public synchronized void fatalError(TaskAttemptID taskId, String msg) 
+    throws IOException {
+      LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
+    }
+    
     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
         int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
       return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed Aug 26 16:11:48 2009
@@ -64,6 +64,7 @@
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /** A Map task. */
 class MapTask extends Task {
@@ -278,6 +279,7 @@
   @Override
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException, ClassNotFoundException, InterruptedException {
+    this.umbilical = umbilical;
 
     // start thread that will handle communication with parent
     TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
@@ -1131,8 +1133,13 @@
             try {
               spillLock.unlock();
               sortAndSpill();
-            } catch (Throwable e) {
+            } catch (Exception e) {
               sortSpillException = e;
+            } catch (Throwable t) {
+              sortSpillException = t;
+              String logMsg = "Task " + getTaskID() + " failed : " 
+                              + StringUtils.stringifyException(t);
+              reportFatalError(getTaskID(), t, logMsg);
             } finally {
               spillLock.lock();
               if (bufend < bufindex && bufindex < bufstart) {

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Aug 26 16:11:48 2009
@@ -339,6 +339,7 @@
   @SuppressWarnings("unchecked")
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException, InterruptedException, ClassNotFoundException {
+    this.umbilical = umbilical;
     job.setBoolean("mapred.skip.on", isSkipping());
 
     if (isMapOrReduce()) {
@@ -374,10 +375,7 @@
       reduceCopier = new ReduceCopier(umbilical, job, reporter);
       if (!reduceCopier.fetchOutputs()) {
         if(reduceCopier.mergeThrowable instanceof FSError) {
-          LOG.error("Task: " + getTaskID() + " - FSError: " + 
-              StringUtils.stringifyException(reduceCopier.mergeThrowable));
-          umbilical.fsError(getTaskID(), 
-              reduceCopier.mergeThrowable.getMessage());
+          throw (FSError)reduceCopier.mergeThrowable;
         }
         throw new IOException("Task: " + getTaskID() + 
             " - The reduce copier failed", reduceCopier.mergeThrowable);
@@ -1220,8 +1218,9 @@
                       StringUtils.stringifyException(io));
             }
           } catch (Throwable th) {
-            LOG.error("Map output copy failure: " + 
-                      StringUtils.stringifyException(th));
+            String msg = getTaskID() + " : Map output copy failure : " 
+                         + StringUtils.stringifyException(th);
+            reportFatalError(getTaskID(), th, msg);
           }
         }
         
@@ -1654,10 +1653,14 @@
         if (bytesRead != mapOutputLength) {
           try {
             mapOutput.discard();
-          } catch (Throwable th) {
+          } catch (Exception ioe) {
             // IGNORED because we are cleaning up
             LOG.info("Failed to discard map-output from " + 
-                mapOutputLoc.getTaskAttemptId(), th);
+                mapOutputLoc.getTaskAttemptId(), ioe);
+          } catch (Throwable t) {
+            String msg = getTaskID() + " : Failed in shuffle to disk :" 
+                         + StringUtils.stringifyException(t);
+            reportFatalError(getTaskID(), t, msg);
           }
           mapOutput = null;
 
@@ -2119,9 +2122,9 @@
         try {
           getMapEventsThread.join();
           LOG.info("getMapsEventsThread joined.");
-        } catch (Throwable t) {
+        } catch (InterruptedException ie) {
           LOG.info("getMapsEventsThread threw an exception: " +
-              StringUtils.stringifyException(t));
+              StringUtils.stringifyException(ie));
         }
 
         synchronized (copiers) {
@@ -2153,13 +2156,13 @@
             inMemFSMergeThread.join();
             LOG.info("In-memory merge complete: " + 
                      mapOutputsFilesInMemory.size() + " files left.");
-            } catch (Throwable t) {
+            } catch (InterruptedException ie) {
             LOG.warn(reduceTask.getTaskID() +
                      " Final merge of the inmemory files threw an exception: " + 
-                     StringUtils.stringifyException(t));
+                     StringUtils.stringifyException(ie));
             // check if the last merge generated an error
             if (mergeThrowable != null) {
-              mergeThrowable = t;
+              mergeThrowable = ie;
             }
             return false;
           }
@@ -2466,14 +2469,18 @@
                      " Local output file is " + outputPath + " of size " +
                      localFileSys.getFileStatus(outputPath).getLen());
             }
-        } catch (Throwable t) {
+        } catch (Exception e) {
           LOG.warn(reduceTask.getTaskID()
                    + " Merging of the local FS files threw an exception: "
-                   + StringUtils.stringifyException(t));
+                   + StringUtils.stringifyException(e));
           if (mergeThrowable == null) {
-            mergeThrowable = t;
+            mergeThrowable = e;
           }
-        } 
+        } catch (Throwable t) {
+          String msg = getTaskID() + " : Failed to merge on the local FS" 
+                       + StringUtils.stringifyException(t);
+          reportFatalError(getTaskID(), t, msg);
+        }
       }
     }
 
@@ -2494,11 +2501,15 @@
               doInMemMerge();
             }
           } while (!exit);
-        } catch (Throwable t) {
+        } catch (Exception e) {
           LOG.warn(reduceTask.getTaskID() +
                    " Merge of the inmemory files threw an exception: "
-                   + StringUtils.stringifyException(t));
-          ReduceCopier.this.mergeThrowable = t;
+                   + StringUtils.stringifyException(e));
+          ReduceCopier.this.mergeThrowable = e;
+        } catch (Throwable t) {
+          String msg = getTaskID() + " : Failed to merge in memory" 
+                       + StringUtils.stringifyException(t);
+          reportFatalError(getTaskID(), t, msg);
         }
       }
       
@@ -2605,9 +2616,10 @@
             return;
           }
           catch (Throwable t) {
-            LOG.warn(reduceTask.getTaskID() +
-                " GetMapEventsThread Ignoring exception : " +
-                StringUtils.stringifyException(t));
+            String msg = reduceTask.getTaskID()
+                         + " GetMapEventsThread Ignoring exception : " 
+                         + StringUtils.stringifyException(t);
+            reportFatalError(getTaskID(), t, msg);
           }
         } while (!exitGetMapEvents);
 

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Wed Aug 26 16:11:48 2009
@@ -138,6 +138,7 @@
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
   protected final Counters.Counter spilledRecordsCounter;
   private String pidFile = "";
+  protected TaskUmbilicalProtocol umbilical;
 
   ////////////////////////////////////////////
   // Constructors
@@ -224,6 +225,24 @@
   protected void setWriteSkipRecs(boolean writeSkipRecs) {
     this.writeSkipRecs = writeSkipRecs;
   }
+
+  /**
+   * Report a fatal error to the parent (task) tracker.
+   */
+  protected void reportFatalError(TaskAttemptID id, Throwable throwable, 
+                                  String logMsg) {
+    LOG.fatal(logMsg);
+    Throwable tCause = throwable.getCause();
+    String cause = tCause == null 
+                   ? StringUtils.stringifyException(throwable)
+                   : StringUtils.stringifyException(tCause);
+    try {
+      umbilical.fatalError(id, cause);
+    } catch (IOException ioe) {
+      LOG.fatal("Failed to contact the tasktracker", ioe);
+      System.exit(-1);
+    }
+  }
   
   /**
    * Get skipRanges.

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed Aug 26 16:11:48 2009
@@ -2606,6 +2606,17 @@
     purgeTask(tip, true);
   }
 
+  /** 
+   * A child task had a fatal error. Kill the task.
+   */  
+  public synchronized void fatalError(TaskAttemptID taskId, String msg) 
+  throws IOException {
+    LOG.fatal("Task: " + taskId + " - Killed : " + msg);
+    TaskInProgress tip = runningTasks.get(taskId);
+    tip.reportDiagnosticInfo("Error: " + msg);
+    purgeTask(tip, true);
+  }
+
   public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
       JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) 
   throws IOException {

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Aug 26 16:11:48 2009
@@ -53,9 +53,10 @@
    * Version 13 changed the getTask method signature for HADOOP-249
    * Version 14 changed the getTask method signature for HADOOP-4232
    * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
+   * Version 16 Added fatalError for child to communicate fatal errors to TT
    * */
 
-  public static final long versionID = 15L;
+  public static final long versionID = 16L;
   
   /**
    * Called when a child task process starts, to get its task.
@@ -128,6 +129,9 @@
   /** Report that the task encounted a local filesystem error.*/
   void fsError(TaskAttemptID taskId, String message) throws IOException;
 
+  /** Report that the task encounted a fatal error.*/
+  void fatalError(TaskAttemptID taskId, String message) throws IOException;
+  
   /** Called by a reduce task to get the map output locations for finished maps.
    * Returns an update centered around the map-task-completion-events. 
    * The update also piggybacks the information whether the events copy at the 

Modified: hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml Wed Aug 26 16:11:48 2009
@@ -50,4 +50,10 @@
 	  The switch condition fall through is intentional and for performance
 	  purposes.
     -->
+     <Match>
+       <Class name="org.apache.hadoop.mapred.Task" />
+       <Method name="reportFatalError" />
+       <Bug pattern="DM_EXIT" />
+     </Match>
+    
 </FindBugsFilter>

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java Wed Aug 26 16:11:48 2009
@@ -49,7 +49,9 @@
         throw new IOException();
       } else if (taskid.endsWith("_1")) {
         System.exit(-1);
-      } 
+      } else if (taskid.endsWith("_2")) {
+        throw new Error();
+      }
     }
   }
 
@@ -106,46 +108,55 @@
     return new JobClient(conf).submitJob(conf);
   }
   
+  private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId, 
+		  TaskStatus ts, boolean isCleanup) 
+  throws IOException {
+    assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
+    assertTrue(ts != null);
+    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+    // validate tasklogs for task attempt
+    String log = TestMiniMRMapRedDebugScript.readTaskLog(
+    TaskLog.LogName.STDERR, attemptId, false);
+    assertTrue(log.contains(taskLog));
+    if (!isCleanup) {
+      // validate task logs: tasklog should contain both task logs
+      // and cleanup logs
+      assertTrue(log.contains(cleanupLog));
+    } else {
+      // validate tasklogs for cleanup attempt
+      log = TestMiniMRMapRedDebugScript.readTaskLog(
+      TaskLog.LogName.STDERR, attemptId, true);
+      assertTrue(log.contains(cleanupLog));
+    }
+  }
+
   private void validateJob(RunningJob job, MiniMRCluster mr) 
   throws IOException {
     assertEquals(JobStatus.SUCCEEDED, job.getJobState());
 	    
     JobID jobId = job.getID();
     // construct the task id of first map task
+    // this should not be cleanup attempt since the first attempt 
+    // fails with an exception
     TaskAttemptID attemptId = 
       new TaskAttemptID(new TaskID(jobId, true, 0), 0);
     TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
                             getTip(attemptId.getTaskID());
-    // this should not be cleanup attempt since the first attempt 
-    // fails with an exception
-    assertTrue(!tip.isCleanupAttempt(attemptId));
     TaskStatus ts = 
       mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    assertTrue(ts != null);
-    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-    // validate task logs: tasklog should contain both task logs
-    // and cleanup logs
-    String log = TestMiniMRMapRedDebugScript.readTaskLog(
-                      TaskLog.LogName.STDERR, attemptId, false);
-    assertTrue(log.contains(taskLog));
-    assertTrue(log.contains(cleanupLog));
+    validateAttempt(tip, attemptId, ts, false);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 1);
     // this should be cleanup attempt since the second attempt fails
     // with System.exit
-    assertTrue(tip.isCleanupAttempt(attemptId));
     ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    assertTrue(ts != null);
-    assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-    // validate tasklogs for task attempt
-    log = TestMiniMRMapRedDebugScript.readTaskLog(
-               TaskLog.LogName.STDERR, attemptId, false);
-    assertTrue(log.contains(taskLog));
-
-    // validate tasklogs for cleanup attempt
-    log = TestMiniMRMapRedDebugScript.readTaskLog(
-               TaskLog.LogName.STDERR, attemptId, true);
-    assertTrue(log.contains(cleanupLog));
+    validateAttempt(tip, attemptId, ts, true);
+    
+    attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 2);
+    // this should be cleanup attempt since the third attempt fails
+    // with Error
+    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, true);
   }
   
   public void testWithDFS() throws IOException {