You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/20 11:18:50 UTC
svn commit: r746193 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: ddas
Date: Fri Feb 20 10:18:50 2009
New Revision: 746193
URL: http://svn.apache.org/viewvc?rev=746193&view=rev
Log:
HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN or KILLED_UNCLEAN tasks forever. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Feb 20 10:18:50 2009
@@ -801,6 +801,9 @@
HADOOP-5255. Fix use of Math.abs to avoid overflow. (Jonathan Ellis via
cdouglas)
+ HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN
+ or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java Fri Feb 20 10:18:50 2009
@@ -169,9 +169,13 @@
umbilical.fsError(taskid, e.getMessage());
} catch (Throwable throwable) {
LOG.warn("Error running child", throwable);
- if (task != null) {
- // do cleanup for the task
- task.taskCleanup(umbilical);
+ try {
+ if (task != null) {
+ // do cleanup for the task
+ task.taskCleanup(umbilical);
+ }
+ } catch (Throwable th) {
+ LOG.info("Error cleaning up" + th);
}
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java Fri Feb 20 10:18:50 2009
@@ -129,7 +129,7 @@
}
}
- public void abortTask(TaskAttemptContext context) {
+ public void abortTask(TaskAttemptContext context) throws IOException {
Path taskOutputPath = getTempTaskOutputPath(context);
try {
if (taskOutputPath != null) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Feb 20 10:18:50 2009
@@ -2130,6 +2130,34 @@
releaseSlot();
}
+ /* State changes:
+ * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED/KILLED_UNCLEAN/KILLED
+ * FAILED_UNCLEAN -> FAILED
+ * KILLED_UNCLEAN -> KILLED
+ */
+ private void setTaskFailState(boolean wasFailure) {
+ // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.KILLED);
+ } else if (task.isMapOrReduce() &&
+ taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+ if (wasFailure) {
+ taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+ } else {
+ taskStatus.setRunState(TaskStatus.State.KILLED_UNCLEAN);
+ }
+ } else {
+ if (wasFailure) {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else {
+ taskStatus.setRunState(TaskStatus.State.KILLED);
+ }
+ }
+ }
+
/**
* The task has actually finished running.
*/
@@ -2156,22 +2184,7 @@
if (!done) {
if (!wasKilled) {
failures += 1;
- /* State changes:
- * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
- * FAILED_UNCLEAN -> FAILED
- * KILLED_UNCLEAN -> KILLED
- */
- if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
- taskStatus.setRunState(TaskStatus.State.FAILED);
- } else if (taskStatus.getRunState() ==
- TaskStatus.State.KILLED_UNCLEAN) {
- taskStatus.setRunState(TaskStatus.State.KILLED);
- } else if (task.isMapOrReduce() &&
- taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
- taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
- } else {
- taskStatus.setRunState(TaskStatus.State.FAILED);
- }
+ setTaskFailState(true);
removeFromMemoryManager(task.getTaskID());
// call the script here for the failed tasks.
if (debugCommand != null) {
@@ -2384,20 +2397,9 @@
/**
* Something went wrong and the task must be killed.
*
- * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
- * FAILED_UNCLEAN -> FAILED
- * KILLED_UNCLEAN -> KILLED
- * UNASSIGNED -> FAILED/KILLED
* @param wasFailure was it a failure (versus a kill request)?
*/
public synchronized void kill(boolean wasFailure) throws IOException {
- /* State changes:
- * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
- * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
- * FAILED_UNCLEAN -> FAILED
- * KILLED_UNCLEAN -> KILLED
- * UNASSIGNED -> FAILED/KILLED
- */
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
isCleaningup()) {
@@ -2406,23 +2408,7 @@
failures += 1;
}
runner.kill();
- if (task.isMapOrReduce()) {
- taskStatus.setRunState((wasFailure) ?
- TaskStatus.State.FAILED_UNCLEAN :
- TaskStatus.State.KILLED_UNCLEAN);
- } else {
- // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
- if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
- taskStatus.setRunState(TaskStatus.State.FAILED);
- } else if (taskStatus.getRunState() ==
- TaskStatus.State.KILLED_UNCLEAN) {
- taskStatus.setRunState(TaskStatus.State.KILLED);
- } else {
- taskStatus.setRunState((wasFailure) ?
- TaskStatus.State.FAILED :
- TaskStatus.State.KILLED);
- }
- }
+ setTaskFailState(wasFailure);
} else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
if (wasFailure) {
failures += 1;
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=746193&r1=746192&r2=746193&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskFail.java Fri Feb 20 10:18:50 2009
@@ -49,6 +49,18 @@
}
}
+ static class CommitterWithFailTaskCleanup extends FileOutputCommitter {
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ System.exit(-1);
+ }
+ }
+
+ static class CommitterWithFailTaskCleanup2 extends FileOutputCommitter {
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ throw new IOException();
+ }
+ }
+
public RunningJob launchJob(JobConf conf,
Path inDir,
Path outDir,
@@ -79,7 +91,34 @@
// return the RunningJob handle.
return new JobClient(conf).submitJob(conf);
}
-
+
+ private void validateJob(RunningJob job, MiniMRCluster mr)
+ throws IOException {
+ assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+
+ JobID jobId = job.getID();
+ // construct the task id of first map task
+ TaskAttemptID attemptId =
+ new TaskAttemptID(new TaskID(jobId, true, 0), 0);
+ TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+ getTip(attemptId.getTaskID());
+ // this should not be cleanup attempt since the first attempt
+ // fails with an exception
+ assertTrue(!tip.isCleanupAttempt(attemptId));
+ TaskStatus ts =
+ mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ assertTrue(ts != null);
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+
+ attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
+ // this should be cleanup attempt since the second attempt fails
+ // with System.exit
+ assertTrue(tip.isCleanupAttempt(attemptId));
+ ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ assertTrue(ts != null);
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+ }
+
public void testWithDFS() throws IOException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
@@ -91,39 +130,25 @@
dfs = new MiniDFSCluster(conf, 4, true, null);
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
- JobConf jobConf = mr.createJobConf();
final Path inDir = new Path("./input");
final Path outDir = new Path("./output");
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
- RunningJob job = null;
-
- job = launchJob(jobConf, inDir, outDir, input);
- // wait for the job to finish.
- while (!job.isComplete());
- assertEquals(JobStatus.SUCCEEDED, job.getJobState());
-
- JobID jobId = job.getID();
- // construct the task id of first map task
- TaskAttemptID attemptId =
- new TaskAttemptID(new TaskID(jobId, true, 0), 0);
- TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
- getTip(attemptId.getTaskID());
- // this should not be cleanup attempt since the first attempt
- // fails with an exception
- assertTrue(!tip.isCleanupAttempt(attemptId));
- TaskStatus ts =
- mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- assertTrue(ts != null);
- assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-
- attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
- // this should be cleanup attempt since the second attempt fails
- // with System.exit
- assertTrue(tip.isCleanupAttempt(attemptId));
- ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- assertTrue(ts != null);
- assertEquals(TaskStatus.State.FAILED, ts.getRunState());
-
+ // launch job with fail tasks
+ RunningJob rJob = launchJob(mr.createJobConf(), inDir, outDir, input);
+ rJob.waitForCompletion();
+ validateJob(rJob, mr);
+ // launch job with fail tasks and fail-cleanups
+ JobConf jobConf = mr.createJobConf();
+ fileSys.delete(outDir, true);
+ jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
+ rJob = launchJob(jobConf, inDir, outDir, input);
+ rJob.waitForCompletion();
+ validateJob(rJob, mr);
+ fileSys.delete(outDir, true);
+ jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
+ rJob = launchJob(jobConf, inDir, outDir, input);
+ rJob.waitForCompletion();
+ validateJob(rJob, mr);
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown(); }