You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/08/26 18:11:48 UTC
svn commit: r808087 - in /hadoop/common/branches/branch-0.20: ./
src/mapred/org/apache/hadoop/mapred/ src/test/
src/test/org/apache/hadoop/mapred/
Author: sharad
Date: Wed Aug 26 16:11:48 2009
New Revision: 808087
URL: http://svn.apache.org/viewvc?rev=808087&view=rev
Log:
MAPREDUCE-430. Fix a bug related to task getting stuck in case of OOM error. Contributed by Amar Kamat.
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Wed Aug 26 16:11:48 2009
@@ -238,6 +238,9 @@
HADOOP-6213. Remove commons dependency on commons-cli2. (Amar Kamat via
sharad)
+ MAPREDUCE-430. Fix a bug related to task getting stuck in case of
+ OOM error. (Amar Kamat via ddas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java Wed Aug 26 16:11:48 2009
@@ -183,22 +183,32 @@
} catch (FSError e) {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskid, e.getMessage());
- } catch (Throwable throwable) {
- LOG.warn("Error running child", throwable);
+ } catch (Exception exception) {
+ LOG.warn("Error running child", exception);
try {
if (task != null) {
// do cleanup for the task
task.taskCleanup(umbilical);
}
- } catch (Throwable th) {
- LOG.info("Error cleaning up" + th);
+ } catch (Exception e) {
+ LOG.info("Error cleaning up" + e);
}
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- throwable.printStackTrace(new PrintStream(baos));
+ exception.printStackTrace(new PrintStream(baos));
if (taskid != null) {
umbilical.reportDiagnosticInfo(taskid, baos.toString());
}
+ } catch (Throwable throwable) {
+ LOG.fatal("Error running child : "
+ + StringUtils.stringifyException(throwable));
+ if (taskid != null) {
+ Throwable tCause = throwable.getCause();
+ String cause = tCause == null
+ ? throwable.getMessage()
+ : StringUtils.stringifyException(tCause);
+ umbilical.fatalError(taskid, cause);
+ }
} finally {
RPC.stopProxy(umbilical);
MetricsContext metricsContext = MetricsUtil.getContext("mapred");
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Wed Aug 26 16:11:48 2009
@@ -59,6 +59,10 @@
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
+ public void fatalError(TaskAttemptID taskId, String msg) throws IOException{
+ LOG.info("Task " + taskId + " reporting fatal error: " + msg);
+ }
+
public JvmTask getTask(JVMId jvmId) throws IOException {
return null;
}
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Wed Aug 26 16:11:48 2009
@@ -342,6 +342,11 @@
LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
}
+ public synchronized void fatalError(TaskAttemptID taskId, String msg)
+ throws IOException {
+ LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
+ }
+
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed Aug 26 16:11:48 2009
@@ -64,6 +64,7 @@
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
/** A Map task. */
class MapTask extends Task {
@@ -278,6 +279,7 @@
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
+ this.umbilical = umbilical;
// start thread that will handle communication with parent
TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
@@ -1131,8 +1133,13 @@
try {
spillLock.unlock();
sortAndSpill();
- } catch (Throwable e) {
+ } catch (Exception e) {
sortSpillException = e;
+ } catch (Throwable t) {
+ sortSpillException = t;
+ String logMsg = "Task " + getTaskID() + " failed : "
+ + StringUtils.stringifyException(t);
+ reportFatalError(getTaskID(), t, logMsg);
} finally {
spillLock.lock();
if (bufend < bufindex && bufindex < bufstart) {
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Aug 26 16:11:48 2009
@@ -339,6 +339,7 @@
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
+ this.umbilical = umbilical;
job.setBoolean("mapred.skip.on", isSkipping());
if (isMapOrReduce()) {
@@ -374,10 +375,7 @@
reduceCopier = new ReduceCopier(umbilical, job, reporter);
if (!reduceCopier.fetchOutputs()) {
if(reduceCopier.mergeThrowable instanceof FSError) {
- LOG.error("Task: " + getTaskID() + " - FSError: " +
- StringUtils.stringifyException(reduceCopier.mergeThrowable));
- umbilical.fsError(getTaskID(),
- reduceCopier.mergeThrowable.getMessage());
+ throw (FSError)reduceCopier.mergeThrowable;
}
throw new IOException("Task: " + getTaskID() +
" - The reduce copier failed", reduceCopier.mergeThrowable);
@@ -1220,8 +1218,9 @@
StringUtils.stringifyException(io));
}
} catch (Throwable th) {
- LOG.error("Map output copy failure: " +
- StringUtils.stringifyException(th));
+ String msg = getTaskID() + " : Map output copy failure : "
+ + StringUtils.stringifyException(th);
+ reportFatalError(getTaskID(), th, msg);
}
}
@@ -1654,10 +1653,14 @@
if (bytesRead != mapOutputLength) {
try {
mapOutput.discard();
- } catch (Throwable th) {
+ } catch (Exception ioe) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
- mapOutputLoc.getTaskAttemptId(), th);
+ mapOutputLoc.getTaskAttemptId(), ioe);
+ } catch (Throwable t) {
+ String msg = getTaskID() + " : Failed in shuffle to disk :"
+ + StringUtils.stringifyException(t);
+ reportFatalError(getTaskID(), t, msg);
}
mapOutput = null;
@@ -2119,9 +2122,9 @@
try {
getMapEventsThread.join();
LOG.info("getMapsEventsThread joined.");
- } catch (Throwable t) {
+ } catch (InterruptedException ie) {
LOG.info("getMapsEventsThread threw an exception: " +
- StringUtils.stringifyException(t));
+ StringUtils.stringifyException(ie));
}
synchronized (copiers) {
@@ -2153,13 +2156,13 @@
inMemFSMergeThread.join();
LOG.info("In-memory merge complete: " +
mapOutputsFilesInMemory.size() + " files left.");
- } catch (Throwable t) {
+ } catch (InterruptedException ie) {
LOG.warn(reduceTask.getTaskID() +
" Final merge of the inmemory files threw an exception: " +
- StringUtils.stringifyException(t));
+ StringUtils.stringifyException(ie));
// check if the last merge generated an error
if (mergeThrowable != null) {
- mergeThrowable = t;
+ mergeThrowable = ie;
}
return false;
}
@@ -2466,14 +2469,18 @@
" Local output file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
}
- } catch (Throwable t) {
+ } catch (Exception e) {
LOG.warn(reduceTask.getTaskID()
+ " Merging of the local FS files threw an exception: "
- + StringUtils.stringifyException(t));
+ + StringUtils.stringifyException(e));
if (mergeThrowable == null) {
- mergeThrowable = t;
+ mergeThrowable = e;
}
- }
+ } catch (Throwable t) {
+ String msg = getTaskID() + " : Failed to merge on the local FS"
+ + StringUtils.stringifyException(t);
+ reportFatalError(getTaskID(), t, msg);
+ }
}
}
@@ -2494,11 +2501,15 @@
doInMemMerge();
}
} while (!exit);
- } catch (Throwable t) {
+ } catch (Exception e) {
LOG.warn(reduceTask.getTaskID() +
" Merge of the inmemory files threw an exception: "
- + StringUtils.stringifyException(t));
- ReduceCopier.this.mergeThrowable = t;
+ + StringUtils.stringifyException(e));
+ ReduceCopier.this.mergeThrowable = e;
+ } catch (Throwable t) {
+ String msg = getTaskID() + " : Failed to merge in memory"
+ + StringUtils.stringifyException(t);
+ reportFatalError(getTaskID(), t, msg);
}
}
@@ -2605,9 +2616,10 @@
return;
}
catch (Throwable t) {
- LOG.warn(reduceTask.getTaskID() +
- " GetMapEventsThread Ignoring exception : " +
- StringUtils.stringifyException(t));
+ String msg = reduceTask.getTaskID()
+ + " GetMapEventsThread Ignoring exception : "
+ + StringUtils.stringifyException(t);
+ reportFatalError(getTaskID(), t, msg);
}
} while (!exitGetMapEvents);
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Wed Aug 26 16:11:48 2009
@@ -138,6 +138,7 @@
protected org.apache.hadoop.mapreduce.OutputCommitter committer;
protected final Counters.Counter spilledRecordsCounter;
private String pidFile = "";
+ protected TaskUmbilicalProtocol umbilical;
////////////////////////////////////////////
// Constructors
@@ -224,6 +225,24 @@
protected void setWriteSkipRecs(boolean writeSkipRecs) {
this.writeSkipRecs = writeSkipRecs;
}
+
+ /**
+ * Report a fatal error to the parent (task) tracker.
+ */
+ protected void reportFatalError(TaskAttemptID id, Throwable throwable,
+ String logMsg) {
+ LOG.fatal(logMsg);
+ Throwable tCause = throwable.getCause();
+ String cause = tCause == null
+ ? StringUtils.stringifyException(throwable)
+ : StringUtils.stringifyException(tCause);
+ try {
+ umbilical.fatalError(id, cause);
+ } catch (IOException ioe) {
+ LOG.fatal("Failed to contact the tasktracker", ioe);
+ System.exit(-1);
+ }
+ }
/**
* Get skipRanges.
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed Aug 26 16:11:48 2009
@@ -2606,6 +2606,17 @@
purgeTask(tip, true);
}
+ /**
+ * A child task had a fatal error. Kill the task.
+ */
+ public synchronized void fatalError(TaskAttemptID taskId, String msg)
+ throws IOException {
+ LOG.fatal("Task: " + taskId + " - Killed : " + msg);
+ TaskInProgress tip = runningTasks.get(taskId);
+ tip.reportDiagnosticInfo("Error: " + msg);
+ purgeTask(tip, true);
+ }
+
public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id)
throws IOException {
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Aug 26 16:11:48 2009
@@ -53,9 +53,10 @@
* Version 13 changed the getTask method signature for HADOOP-249
* Version 14 changed the getTask method signature for HADOOP-4232
* Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
+ * Version 16 Added fatalError for child to communicate fatal errors to TT
* */
- public static final long versionID = 15L;
+ public static final long versionID = 16L;
/**
* Called when a child task process starts, to get its task.
@@ -128,6 +129,9 @@
/** Report that the task encounted a local filesystem error.*/
void fsError(TaskAttemptID taskId, String message) throws IOException;
+ /** Report that the task encounted a fatal error.*/
+ void fatalError(TaskAttemptID taskId, String message) throws IOException;
+
/** Called by a reduce task to get the map output locations for finished maps.
* Returns an update centered around the map-task-completion-events.
* The update also piggybacks the information whether the events copy at the
Modified: hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-0.20/src/test/findbugsExcludeFile.xml Wed Aug 26 16:11:48 2009
@@ -50,4 +50,10 @@
The switch condition fall through is intentional and for performance
purposes.
-->
+ <Match>
+ <Class name="org.apache.hadoop.mapred.Task" />
+ <Method name="reportFatalError" />
+ <Bug pattern="DM_EXIT" />
+ </Match>
+
</FindBugsFilter>
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=808087&r1=808086&r2=808087&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java Wed Aug 26 16:11:48 2009
@@ -49,7 +49,9 @@
throw new IOException();
} else if (taskid.endsWith("_1")) {
System.exit(-1);
- }
+ } else if (taskid.endsWith("_2")) {
+ throw new Error();
+ }
}
}
@@ -106,46 +108,55 @@
return new JobClient(conf).submitJob(conf);
}
+ private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId,
+ TaskStatus ts, boolean isCleanup)
+ throws IOException {
+ assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
+ assertTrue(ts != null);
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+ // validate tasklogs for task attempt
+ String log = TestMiniMRMapRedDebugScript.readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, false);
+ assertTrue(log.contains(taskLog));
+ if (!isCleanup) {
+ // validate task logs: tasklog should contain both task logs
+ // and cleanup logs
+ assertTrue(log.contains(cleanupLog));
+ } else {
+ // validate tasklogs for cleanup attempt
+ log = TestMiniMRMapRedDebugScript.readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, true);
+ assertTrue(log.contains(cleanupLog));
+ }
+ }
+
private void validateJob(RunningJob job, MiniMRCluster mr)
throws IOException {
assertEquals(JobStatus.SUCCEEDED, job.getJobState());
JobID jobId = job.getID();
// construct the task id of first map task
+ // this should not be cleanup attempt since the first attempt
+ // fails with an exception
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(jobId, true, 0), 0);
TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
getTip(attemptId.getTaskID());
- // this should not be cleanup attempt since the first attempt
- // fails with an exception
- assertTrue(!tip.isCleanupAttempt(attemptId));
TaskStatus ts =
mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- assertTrue(ts != null);
- assertEquals(TaskStatus.State.FAILED, ts.getRunState());
- // validate task logs: tasklog should contain both task logs
- // and cleanup logs
- String log = TestMiniMRMapRedDebugScript.readTaskLog(
- TaskLog.LogName.STDERR, attemptId, false);
- assertTrue(log.contains(taskLog));
- assertTrue(log.contains(cleanupLog));
+ validateAttempt(tip, attemptId, ts, false);
attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
// this should be cleanup attempt since the second attempt fails
// with System.exit
- assertTrue(tip.isCleanupAttempt(attemptId));
ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- assertTrue(ts != null);
- assertEquals(TaskStatus.State.FAILED, ts.getRunState());
- // validate tasklogs for task attempt
- log = TestMiniMRMapRedDebugScript.readTaskLog(
- TaskLog.LogName.STDERR, attemptId, false);
- assertTrue(log.contains(taskLog));
-
- // validate tasklogs for cleanup attempt
- log = TestMiniMRMapRedDebugScript.readTaskLog(
- TaskLog.LogName.STDERR, attemptId, true);
- assertTrue(log.contains(cleanupLog));
+ validateAttempt(tip, attemptId, ts, true);
+
+ attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 2);
+ // this should be cleanup attempt since the third attempt fails
+ // with Error
+ ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ validateAttempt(tip, attemptId, ts, true);
}
public void testWithDFS() throws IOException {