You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2010/02/15 15:23:03 UTC

svn commit: r910223 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/Task.java src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java

Author: yhemanth
Date: Mon Feb 15 14:23:02 2010
New Revision: 910223

URL: http://svn.apache.org/viewvc?rev=910223&view=rev
Log:
MAPREDUCE-1476. Fix the M/R framework to not call commit for special tasks like job setup/cleanup and task cleanup. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=910223&r1=910222&r2=910223&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Feb 15 14:23:02 2010
@@ -347,6 +347,10 @@
 
     cdouglas)
 
+    MAPREDUCE-1476. Fix the M/R framework to not call commit for special
+    tasks like job setup/cleanup and task cleanup.
+    (Amareshwari Sriramadasu via yhemanth)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=910223&r1=910222&r2=910223&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Feb 15 14:23:02 2010
@@ -46,10 +46,10 @@
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -58,7 +58,6 @@
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
 
 /** 
  * Base class for tasks.
@@ -744,8 +743,7 @@
              + " And is in the process of commiting");
     updateCounters();
 
-    // check whether the commit is required.
-    boolean commitRequired = committer.needsTaskCommit(taskContext);
+    boolean commitRequired = isCommitRequired();
     if (commitRequired) {
       int retries = MAX_RETRIES;
       setState(TaskStatus.State.COMMIT_PENDING);
@@ -775,6 +773,22 @@
   }
 
   /**
+   * Checks if this task has anything to commit, depending on the
+   * type of task, as well as on whether the {@link OutputCommitter}
+   * has anything to commit.
+   * 
+   * @return true if the task has to commit
+   * @throws IOException
+   */
+  boolean isCommitRequired() throws IOException {
+    boolean commitRequired = false;
+    if (isMapOrReduce()) {
+      commitRequired = committer.needsTaskCommit(taskContext);
+    }
+    return commitRequired;
+  }
+
+  /**
    * Send a status update to the task tracker
    * @param umbilical
    * @throws IOException

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java?rev=910223&r1=910222&r2=910223&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java Mon Feb 15 14:23:02 2010
@@ -17,12 +17,19 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.TaskType;
 
 public class TestTaskCommit extends HadoopTestCase {
+  Path rootDir = 
+    new Path(System.getProperty("test.build.data",  "/tmp"), "test");
 
   static class CommitterWithCommitFail extends FileOutputCommitter {
     public void commitTask(TaskAttemptContext context) throws IOException {
@@ -38,13 +45,43 @@
     }
   }
 
+  /**
+   * Special Committer that does not cleanup temporary files in
+   * abortTask
+   * 
+   * The framework's FileOutputCommitter cleans up any temporary
+   * files left behind in abortTask. We want the test case to
+   * find these files and hence short-circuit abortTask.
+   */
+  static class CommitterWithoutCleanup extends FileOutputCommitter {
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+      // does nothing
+    }
+  }
+  
+  /**
+   * Special committer that always requires commit.
+   */
+  static class CommitterThatAlwaysRequiresCommit extends FileOutputCommitter {
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context) 
+      throws IOException {
+      return true;
+    }
+  }
+
   public TestTaskCommit() throws IOException {
     super(LOCAL_MR, LOCAL_FS, 1, 1);
   }
   
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    FileUtil.fullyDelete(new File(rootDir.toString()));
+  }
+  
   public void testCommitFail() throws IOException {
-    Path rootDir = 
-      new Path(System.getProperty("test.build.data",  "/tmp"), "test");
     final Path inDir = new Path(rootDir, "./input");
     final Path outDir = new Path(rootDir, "./output");
     JobConf jobConf = createJobConf();
@@ -54,7 +91,187 @@
     rJob.waitForCompletion();
     assertEquals(JobStatus.FAILED, rJob.getJobState());
   }
+  
+  private class MyUmbilical implements TaskUmbilicalProtocol {
+    boolean taskDone = false;
+
+    @Override
+    public boolean canCommit(TaskAttemptID taskid) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      fail("Task should not go to commit-pending");
+    }
+
+    @Override
+    public void done(TaskAttemptID taskid) throws IOException {
+      taskDone = true;
+    }
+
+    @Override
+    public void fatalError(TaskAttemptID taskId, String message)
+        throws IOException { }
+
+    @Override
+    public void fsError(TaskAttemptID taskId, String message)
+        throws IOException { }
+
+    @Override
+    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+        int fromIndex, int maxLocs, TaskAttemptID id) throws IOException {
+      return null;
+    }
+
+    @Override
+    public JvmTask getTask(JvmContext context) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean ping(TaskAttemptID taskid) throws IOException {
+      return true;
+    }
+
+    @Override
+    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace)
+        throws IOException {
+    }
+
+    @Override
+    public void reportNextRecordRange(TaskAttemptID taskid, Range range)
+        throws IOException {
+    }
+
+    @Override
+    public void shuffleError(TaskAttemptID taskId, String message)
+        throws IOException {
+    }
+
+    @Override
+    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      return true;
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return 0;
+    }
+  }
+  
+  /**
+   * A test that mimics a failed task to ensure that it does
+   * not get into the COMMIT_PENDING state, by using a fake
+   * UmbilicalProtocol's implementation that fails if the commit.
+   * protocol is played.
+   * 
+   * The test mocks the various steps in a failed task's 
+   * life-cycle using a special OutputCommitter and UmbilicalProtocol
+   * implementation.
+   * 
+   * @throws Exception
+   */
+  public void testTaskCleanupDoesNotCommit() throws Exception {
+    // Mimic a job with a special committer that does not cleanup
+    // files when a task fails.
+    JobConf job = new JobConf();
+    job.setOutputCommitter(CommitterWithoutCleanup.class);
+    Path outDir = new Path(rootDir, "output"); 
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    // Mimic job setup
+    String dummyAttemptID = "attempt_200707121733_0001_m_000000_0";
+    TaskAttemptID attemptID = TaskAttemptID.forName(dummyAttemptID);
+    OutputCommitter committer = new CommitterWithoutCleanup();
+    JobContext jContext = new JobContextImpl(job, attemptID.getJobID());
+    committer.setupJob(jContext);
+    
+
+    // Mimic a map task
+    dummyAttemptID = "attempt_200707121733_0001_m_000001_0";
+    attemptID = TaskAttemptID.forName(dummyAttemptID);
+    Task task = new MapTask(null, attemptID, 0, null, 1);
+    task.setConf(job);
+    task.localizeConfiguration(job);
+    task.initialize(job, attemptID.getJobID(), Reporter.NULL, false);
+    
+    // Mimic the map task writing some output.
+    String file = "test.txt";
+    FileSystem localFs = FileSystem.getLocal(job);
+    TextOutputFormat<Text, Text> theOutputFormat 
+      = new TextOutputFormat<Text, Text>();
+    RecordWriter<Text, Text> theRecordWriter = 
+      theOutputFormat.getRecordWriter(localFs,
+        job, file, Reporter.NULL);
+    theRecordWriter.write(new Text("key"), new Text("value"));
+    theRecordWriter.close(Reporter.NULL);
+
+    // Mimic a task failure; setting up the task for cleanup simulates
+    // the abort protocol to be played.
+    // Without checks in the framework, this will fail
+    // as the committer will cause a COMMIT to happen for
+    // the cleanup task.
+    task.setTaskCleanupTask();
+    MyUmbilical umbilical = new MyUmbilical();
+    task.run(job, umbilical);
+    assertTrue("Task did not succeed", umbilical.taskDone);
+  }
+
+  public void testCommitRequiredForMapTask() throws Exception {
+    Task testTask = createDummyTask(TaskType.MAP);
+    assertTrue("MapTask should need commit", testTask.isCommitRequired());
+  }
 
+  public void testCommitRequiredForReduceTask() throws Exception {
+    Task testTask = createDummyTask(TaskType.REDUCE);
+    assertTrue("ReduceTask should need commit", testTask.isCommitRequired());
+  }
+  
+  public void testCommitNotRequiredForJobSetup() throws Exception {
+    Task testTask = createDummyTask(TaskType.MAP);
+    testTask.setJobSetupTask();
+    assertFalse("Job setup task should not need commit", 
+        testTask.isCommitRequired());
+  }
+  
+  public void testCommitNotRequiredForJobCleanup() throws Exception {
+    Task testTask = createDummyTask(TaskType.MAP);
+    testTask.setJobCleanupTask();
+    assertFalse("Job cleanup task should not need commit", 
+        testTask.isCommitRequired());
+  }
+
+  public void testCommitNotRequiredForTaskCleanup() throws Exception {
+    Task testTask = createDummyTask(TaskType.REDUCE);
+    testTask.setTaskCleanupTask();
+    assertFalse("Task cleanup task should not need commit", 
+        testTask.isCommitRequired());
+  }
+
+  private Task createDummyTask(TaskType type) throws IOException, ClassNotFoundException,
+  InterruptedException {
+    JobConf conf = new JobConf();
+    conf.setOutputCommitter(CommitterThatAlwaysRequiresCommit.class);
+    Path outDir = new Path(rootDir, "output"); 
+    FileOutputFormat.setOutputPath(conf, outDir);
+    JobID jobId = JobID.forName("job_201002121132_0001");
+    Task testTask;
+    if (type == TaskType.MAP) {
+      testTask = new MapTask();
+    } else {
+      testTask = new ReduceTask();
+    }
+    testTask.setConf(conf);
+    testTask.initialize(conf, jobId, Reporter.NULL, false);
+    return testTask;
+  }
+
+
+  
   public static void main(String[] argv) throws Exception {
     TestTaskCommit td = new TestTaskCommit();
     td.testCommitFail();