You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2010/02/15 15:23:03 UTC
svn commit: r910223 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/Task.java
src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
Author: yhemanth
Date: Mon Feb 15 14:23:02 2010
New Revision: 910223
URL: http://svn.apache.org/viewvc?rev=910223&view=rev
Log:
MAPREDUCE-1476. Fix the M/R framework to not call commit for special tasks like job setup/cleanup and task cleanup. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=910223&r1=910222&r2=910223&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Feb 15 14:23:02 2010
@@ -347,6 +347,10 @@
cdouglas)
+ MAPREDUCE-1476. Fix the M/R framework to not call commit for special
+ tasks like job setup/cleanup and task cleanup.
+ (Amareshwari Sriramadasu via yhemanth)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=910223&r1=910222&r2=910223&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Feb 15 14:23:02 2010
@@ -46,10 +46,10 @@
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -58,7 +58,6 @@
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
/**
* Base class for tasks.
@@ -744,8 +743,7 @@
+ " And is in the process of commiting");
updateCounters();
- // check whether the commit is required.
- boolean commitRequired = committer.needsTaskCommit(taskContext);
+ boolean commitRequired = isCommitRequired();
if (commitRequired) {
int retries = MAX_RETRIES;
setState(TaskStatus.State.COMMIT_PENDING);
@@ -775,6 +773,22 @@
}
/**
+ * Checks if this task has anything to commit, depending on the
+ * type of task, as well as on whether the {@link OutputCommitter}
+ * has anything to commit.
+ *
+ * @return true if the task has to commit
+ * @throws IOException
+ */
+ boolean isCommitRequired() throws IOException {
+ boolean commitRequired = false;
+ if (isMapOrReduce()) {
+ commitRequired = committer.needsTaskCommit(taskContext);
+ }
+ return commitRequired;
+ }
+
+ /**
* Send a status update to the task tracker
* @param umbilical
* @throws IOException
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java?rev=910223&r1=910222&r2=910223&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java Mon Feb 15 14:23:02 2010
@@ -17,12 +17,19 @@
*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.TaskType;
public class TestTaskCommit extends HadoopTestCase {
+ Path rootDir =
+ new Path(System.getProperty("test.build.data", "/tmp"), "test");
static class CommitterWithCommitFail extends FileOutputCommitter {
public void commitTask(TaskAttemptContext context) throws IOException {
@@ -38,13 +45,43 @@
}
}
+ /**
+ * Special Committer that does not cleanup temporary files in
+ * abortTask
+ *
+ * The framework's FileOutputCommitter cleans up any temporary
+ * files left behind in abortTask. We want the test case to
+ * find these files and hence short-circuit abortTask.
+ */
+ static class CommitterWithoutCleanup extends FileOutputCommitter {
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ // does nothing
+ }
+ }
+
+ /**
+ * Special committer that always requires commit.
+ */
+ static class CommitterThatAlwaysRequiresCommit extends FileOutputCommitter {
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context)
+ throws IOException {
+ return true;
+ }
+ }
+
public TestTaskCommit() throws IOException {
super(LOCAL_MR, LOCAL_FS, 1, 1);
}
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ FileUtil.fullyDelete(new File(rootDir.toString()));
+ }
+
public void testCommitFail() throws IOException {
- Path rootDir =
- new Path(System.getProperty("test.build.data", "/tmp"), "test");
final Path inDir = new Path(rootDir, "./input");
final Path outDir = new Path(rootDir, "./output");
JobConf jobConf = createJobConf();
@@ -54,7 +91,187 @@
rJob.waitForCompletion();
assertEquals(JobStatus.FAILED, rJob.getJobState());
}
+
+ private class MyUmbilical implements TaskUmbilicalProtocol {
+ boolean taskDone = false;
+
+ @Override
+ public boolean canCommit(TaskAttemptID taskid) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ fail("Task should not go to commit-pending");
+ }
+
+ @Override
+ public void done(TaskAttemptID taskid) throws IOException {
+ taskDone = true;
+ }
+
+ @Override
+ public void fatalError(TaskAttemptID taskId, String message)
+ throws IOException { }
+
+ @Override
+ public void fsError(TaskAttemptID taskId, String message)
+ throws IOException { }
+
+ @Override
+ public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+ int fromIndex, int maxLocs, TaskAttemptID id) throws IOException {
+ return null;
+ }
+
+ @Override
+ public JvmTask getTask(JvmContext context) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean ping(TaskAttemptID taskid) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void reportDiagnosticInfo(TaskAttemptID taskid, String trace)
+ throws IOException {
+ }
+
+ @Override
+ public void reportNextRecordRange(TaskAttemptID taskid, Range range)
+ throws IOException {
+ }
+
+ @Override
+ public void shuffleError(TaskAttemptID taskId, String message)
+ throws IOException {
+ }
+
+ @Override
+ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ return true;
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return 0;
+ }
+ }
+
+ /**
+ * A test that mimics a failed task to ensure that it does
+ * not get into the COMMIT_PENDING state, by using a fake
+ * UmbilicalProtocol's implementation that fails if the commit.
+ * protocol is played.
+ *
+ * The test mocks the various steps in a failed task's
+ * life-cycle using a special OutputCommitter and UmbilicalProtocol
+ * implementation.
+ *
+ * @throws Exception
+ */
+ public void testTaskCleanupDoesNotCommit() throws Exception {
+ // Mimic a job with a special committer that does not cleanup
+ // files when a task fails.
+ JobConf job = new JobConf();
+ job.setOutputCommitter(CommitterWithoutCleanup.class);
+ Path outDir = new Path(rootDir, "output");
+ FileOutputFormat.setOutputPath(job, outDir);
+
+ // Mimic job setup
+ String dummyAttemptID = "attempt_200707121733_0001_m_000000_0";
+ TaskAttemptID attemptID = TaskAttemptID.forName(dummyAttemptID);
+ OutputCommitter committer = new CommitterWithoutCleanup();
+ JobContext jContext = new JobContextImpl(job, attemptID.getJobID());
+ committer.setupJob(jContext);
+
+
+ // Mimic a map task
+ dummyAttemptID = "attempt_200707121733_0001_m_000001_0";
+ attemptID = TaskAttemptID.forName(dummyAttemptID);
+ Task task = new MapTask(null, attemptID, 0, null, 1);
+ task.setConf(job);
+ task.localizeConfiguration(job);
+ task.initialize(job, attemptID.getJobID(), Reporter.NULL, false);
+
+ // Mimic the map task writing some output.
+ String file = "test.txt";
+ FileSystem localFs = FileSystem.getLocal(job);
+ TextOutputFormat<Text, Text> theOutputFormat
+ = new TextOutputFormat<Text, Text>();
+ RecordWriter<Text, Text> theRecordWriter =
+ theOutputFormat.getRecordWriter(localFs,
+ job, file, Reporter.NULL);
+ theRecordWriter.write(new Text("key"), new Text("value"));
+ theRecordWriter.close(Reporter.NULL);
+
+ // Mimic a task failure; setting up the task for cleanup simulates
+ // the abort protocol to be played.
+ // Without checks in the framework, this will fail
+ // as the committer will cause a COMMIT to happen for
+ // the cleanup task.
+ task.setTaskCleanupTask();
+ MyUmbilical umbilical = new MyUmbilical();
+ task.run(job, umbilical);
+ assertTrue("Task did not succeed", umbilical.taskDone);
+ }
+
+ public void testCommitRequiredForMapTask() throws Exception {
+ Task testTask = createDummyTask(TaskType.MAP);
+ assertTrue("MapTask should need commit", testTask.isCommitRequired());
+ }
+ public void testCommitRequiredForReduceTask() throws Exception {
+ Task testTask = createDummyTask(TaskType.REDUCE);
+ assertTrue("ReduceTask should need commit", testTask.isCommitRequired());
+ }
+
+ public void testCommitNotRequiredForJobSetup() throws Exception {
+ Task testTask = createDummyTask(TaskType.MAP);
+ testTask.setJobSetupTask();
+ assertFalse("Job setup task should not need commit",
+ testTask.isCommitRequired());
+ }
+
+ public void testCommitNotRequiredForJobCleanup() throws Exception {
+ Task testTask = createDummyTask(TaskType.MAP);
+ testTask.setJobCleanupTask();
+ assertFalse("Job cleanup task should not need commit",
+ testTask.isCommitRequired());
+ }
+
+ public void testCommitNotRequiredForTaskCleanup() throws Exception {
+ Task testTask = createDummyTask(TaskType.REDUCE);
+ testTask.setTaskCleanupTask();
+ assertFalse("Task cleanup task should not need commit",
+ testTask.isCommitRequired());
+ }
+
+ private Task createDummyTask(TaskType type) throws IOException, ClassNotFoundException,
+ InterruptedException {
+ JobConf conf = new JobConf();
+ conf.setOutputCommitter(CommitterThatAlwaysRequiresCommit.class);
+ Path outDir = new Path(rootDir, "output");
+ FileOutputFormat.setOutputPath(conf, outDir);
+ JobID jobId = JobID.forName("job_201002121132_0001");
+ Task testTask;
+ if (type == TaskType.MAP) {
+ testTask = new MapTask();
+ } else {
+ testTask = new ReduceTask();
+ }
+ testTask.setConf(conf);
+ testTask.initialize(conf, jobId, Reporter.NULL, false);
+ return testTask;
+ }
+
+
+
public static void main(String[] argv) throws Exception {
TestTaskCommit td = new TestTaskCommit();
td.testCommitFail();