You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/01/10 00:04:03 UTC

svn commit: r1431137 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...

Author: jlowe
Date: Wed Jan  9 23:04:03 2013
New Revision: 1431137

URL: http://svn.apache.org/viewvc?rev=1431137&view=rev
Log:
svn merge -c 1431131 FIXES: MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery. Contributed by Jerry Chen

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1431137&r1=1431136&r2=1431137&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Jan  9 23:04:03 2013
@@ -72,6 +72,9 @@ Release 0.23.6 - UNRELEASED
     MAPREDUCE-4913. TestMRAppMaster#testMRAppMasterMissingStaging occasionally 
     exits (Jason Lowe via tgraves)
 
+    MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry
+    Chen via jlowe)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1431137&r1=1431136&r2=1431137&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Jan  9 23:04:03 2013
@@ -579,7 +579,7 @@ public class MRAppMaster extends Composi
    */
   protected Recovery createRecoveryService(AppContext appContext) {
     return new RecoveryService(appContext.getApplicationAttemptId(),
-        appContext.getClock(), getCommitter());
+        appContext.getClock(), getCommitter(), isNewApiCommitter());
   }
 
   /** Create and initialize (but don't start) a single job. 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1431137&r1=1431136&r2=1431137&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Wed Jan  9 23:04:03 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -100,6 +101,7 @@ public class RecoveryService extends Com
 
   private final ApplicationAttemptId applicationAttemptId;
   private final OutputCommitter committer;
+  private final boolean newApiCommitter;
   private final Dispatcher dispatcher;
   private final ControlledClock clock;
 
@@ -113,10 +115,11 @@ public class RecoveryService extends Com
   private volatile boolean recoveryMode = false;
 
   public RecoveryService(ApplicationAttemptId applicationAttemptId, 
-      Clock clock, OutputCommitter committer) {
+      Clock clock, OutputCommitter committer, boolean newApiCommitter) {
     super("RecoveringDispatcher");
     this.applicationAttemptId = applicationAttemptId;
     this.committer = committer;
+    this.newApiCommitter = newApiCommitter;
     this.dispatcher = createRecoveryDispatcher();
     this.clock = new ControlledClock(clock);
       addService((Service) dispatcher);
@@ -360,8 +363,17 @@ public class RecoveryService extends Com
         switch (state) {
         case SUCCEEDED:
           //recover the task output
-          TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
-              attInfo.getAttemptId());
+          
+          // check the committer type and construct corresponding context
+          TaskAttemptContext taskContext = null;
+          if(newApiCommitter) {
+            taskContext = new TaskAttemptContextImpl(getConfig(),
+                attInfo.getAttemptId());
+          } else {
+            taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
+                TypeConverter.fromYarn(aId));
+          }
+          
           try { 
             TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
             int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1431137&r1=1431136&r2=1431137&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Wed Jan  9 23:04:03 2013
@@ -626,6 +626,115 @@ public class TestRecovery {
     validateOutput();
   }
   
+  @Test
+  public void testRecoveryWithOldCommiter() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", false);
+    conf.setBoolean("mapred.reducer.new-api", false);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    //stop the app before the job completes.
+    app.stop();
+
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", false);
+    conf.setBoolean("mapred.reducer.new-api", false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    reduceTask1 = it.next();
+    Task reduceTask2 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    // first reduce will be recovered, no need to send done
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
+    
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+    
+    TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+        .iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+    
+   //send the done signal to the 2nd reduce task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce2Attempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+  
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,