You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/24 10:44:55 UTC

svn commit: r1188044 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org...

Author: vinodkv
Date: Mon Oct 24 08:44:54 2011
New Revision: 1188044

URL: http://svn.apache.org/viewvc?rev=1188044&view=rev
Log:
MAPREDUCE-2708. Designed and implemented MR Application Master recovery to make MR AMs resume their progress after restart. Contributed by Sharad Agarwal.
svn merge -c r1188043 --ignore-ancestry ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Oct 24 08:44:54 2011
@@ -62,6 +62,9 @@ Release 0.23.0 - Unreleased
 
     MAPREDUCE-3104. Implemented Application-acls. (vinodkv)
 
+    MAPREDUCE-2708. Designed and implemented MR Application Master recovery to
+    make MR AMs resume their progress after restart. (Sharad Agarwal via vinodkv)
+
   IMPROVEMENTS
 
     MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Mon Oct 24 08:44:54 2011
@@ -29,11 +29,9 @@ import org.apache.hadoop.mapred.TaskLog.
 import org.apache.hadoop.mapreduce.ID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Apps;
 
 public class MapReduceChildJVM {
 
@@ -131,6 +129,8 @@ public class MapReduceChildJVM {
         MRJobConfig.STDERR_LOGFILE_ENV,
         getTaskLogFile(TaskLog.LogName.STDERR)
         );
+    environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV, 
+        	conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
   }
 
   private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Mon Oct 24 08:44:54 2011
@@ -239,6 +239,14 @@ class YarnChild {
       Token<JobTokenIdentifier> jt) throws IOException {
     final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
     job.setCredentials(credentials);
+    
+    String appAttemptIdEnv = System
+        .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+    LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
+    // Set it in conf, so as to be able to be used the the OutputCommitter.
+    job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
+        .parseInt(appAttemptIdEnv));
+
     // set tcp nodelay
     job.setBoolean("ipc.client.tcpnodelay", true);
     job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Oct 24 08:44:54 2011
@@ -36,18 +36,25 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.LocalContainerLauncher;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
-import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -76,12 +83,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.YarnException;
@@ -93,6 +102,8 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
@@ -121,6 +132,9 @@ public class MRAppMaster extends Composi
 
   private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
 
+  private final RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
   private Clock clock;
   private final long startTime;
   private final long appSubmitTime;
@@ -143,6 +157,9 @@ public class MRAppMaster extends Composi
   private TaskAttemptListener taskAttemptListener;
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
+  private JobId jobId;
+  private boolean newApiCommitter;
+  private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
   private boolean inRecovery = false;
 
@@ -182,15 +199,39 @@ public class MRAppMaster extends Composi
     // for an app later
     appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
 
-    if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
-         && appAttemptID.getAttemptId() > 1) {
-      LOG.info("Recovery is enabled. Will try to recover from previous life.");
-      recoveryServ = new RecoveryService(appAttemptID, clock);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
+     
+    newApiCommitter = false;
+    jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
+        appAttemptID.getApplicationId().getId());
+    int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if ((numReduceTasks > 0 && 
+        conf.getBoolean("mapred.reducer.new-api", false)) ||
+          (numReduceTasks == 0 && 
+           conf.getBoolean("mapred.mapper.new-api", false)))  {
+      newApiCommitter = true;
+      LOG.info("Using mapred newApiCommitter.");
+    }
+
+    committer = createOutputCommitter(conf);
+    boolean recoveryEnabled = conf.getBoolean(
+        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+    if (recoveryEnabled && recoverySupportedByCommitter
+        && appAttemptID.getAttemptId() > 1) {
+      LOG.info("Recovery is enabled. "
+          + "Will try to recover from previous life on best effort basis.");
+      recoveryServ = new RecoveryService(appAttemptID, clock, 
+          committer);
       addIfService(recoveryServ);
       dispatcher = recoveryServ.getDispatcher();
       clock = recoveryServ.getClock();
       inRecovery = true;
     } else {
+      LOG.info("Not starting RecoveryService: recoveryEnabled: "
+          + recoveryEnabled + " recoverySupportedByCommitter: "
+          + recoverySupportedByCommitter + " ApplicationAttemptID: "
+          + appAttemptID.getAttemptId());
       dispatcher = new AsyncDispatcher();
       addIfService(dispatcher);
     }
@@ -253,7 +294,36 @@ public class MRAppMaster extends Composi
     super.init(conf);
   } // end of init()
 
-  
+  private OutputCommitter createOutputCommitter(Configuration conf) {
+    OutputCommitter committer = null;
+
+    LOG.info("OutputCommitter set in config "
+        + conf.get("mapred.output.committer.class"));
+
+    if (newApiCommitter) {
+      org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
+          .newTaskId(jobId, 0, TaskType.MAP);
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
+          .newTaskAttemptId(taskID, 0);
+      TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+          TypeConverter.fromYarn(attemptID));
+      OutputFormat outputFormat;
+      try {
+        outputFormat = ReflectionUtils.newInstance(taskContext
+            .getOutputFormatClass(), conf);
+        committer = outputFormat.getOutputCommitter(taskContext);
+      } catch (Exception e) {
+        throw new YarnException(e);
+      }
+    } else {
+      committer = ReflectionUtils.newInstance(conf.getClass(
+          "mapred.output.committer.class", FileOutputCommitter.class,
+          org.apache.hadoop.mapred.OutputCommitter.class), conf);
+    }
+    LOG.info("OutputCommitter is " + committer.getClass().getName());
+    return committer;
+  }
+
   protected boolean keepJobFiles(JobConf conf) {
     return (conf.getKeepTaskFilesPattern() != null || conf
         .getKeepFailedTaskFiles());
@@ -348,10 +418,10 @@ public class MRAppMaster extends Composi
   protected Job createJob(Configuration conf) {
 
     // create single job
-    Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
-        taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
-        completedTasksFromPreviousRun, metrics, currentUser.getUserName(),
-        appSubmitTime, amInfos);
+    Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher
+        .getEventHandler(), taskAttemptListener, jobTokenSecretManager,
+        fsTokens, clock, completedTasksFromPreviousRun, metrics, committer,
+        newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos);
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
     dispatcher.register(JobFinishEvent.Type.class,
@@ -471,6 +541,22 @@ public class MRAppMaster extends Composi
     return appAttemptID.getApplicationId();
   }
 
+  public ApplicationAttemptId getAttemptID() {
+    return appAttemptID;
+  }
+
+  public JobId getJobId() {
+    return jobId;
+  }
+
+  public OutputCommitter getCommitter() {
+    return committer;
+  }
+
+  public boolean isNewApiCommitter() {
+    return newApiCommitter;
+  }
+
   public int getStartCount() {
     return appAttemptID.getAttemptId();
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Oct 24 08:44:54 2011
@@ -39,15 +39,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -64,7 +61,6 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.Counter;
 import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@@ -98,14 +94,11 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -126,15 +119,13 @@ public class JobImpl implements org.apac
 
   // Maximum no. of fetch-failure notifications after which map task is failed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
-
-  private final RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
   
   //final fields
   private final ApplicationAttemptId applicationAttemptId;
   private final Clock clock;
   private final JobACLsManager aclsManager;
   private final String username;
+  private final OutputCommitter committer;
   private final Map<JobACL, AccessControlList> jobACLs;
   private final Set<TaskId> completedTasksFromPreviousRun;
   private final List<AMInfo> amInfos;
@@ -142,6 +133,7 @@ public class JobImpl implements org.apac
   private final Lock writeLock;
   private final JobId jobId;
   private final String jobName;
+  private final boolean newApiCommitter;
   private final org.apache.hadoop.mapreduce.JobID oldJobId;
   private final TaskAttemptListener taskAttemptListener;
   private final Object tasksSyncHandle = new Object();
@@ -167,7 +159,6 @@ public class JobImpl implements org.apac
   private Path remoteJobSubmitDir;
   public Path remoteJobConfFile;
   private JobContext jobContext;
-  private OutputCommitter committer;
   private int allowedMapFailuresPercent = 0;
   private int allowedReduceFailuresPercent = 0;
   private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
@@ -367,14 +358,16 @@ public class JobImpl implements org.apac
   private Token<JobTokenIdentifier> jobToken;
   private JobTokenSecretManager jobTokenSecretManager;
 
-  public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
-      EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
+  public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
+      Configuration conf, EventHandler eventHandler,
+      TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
-      Credentials fsTokenCredentials, Clock clock, 
+      Credentials fsTokenCredentials, Clock clock,
       Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
-      String userName, long appSubmitTime, List<AMInfo> amInfos) {
+      OutputCommitter committer, boolean newApiCommitter, String userName,
+      long appSubmitTime, List<AMInfo> amInfos) {
     this.applicationAttemptId = applicationAttemptId;
-    this.jobId = recordFactory.newRecordInstance(JobId.class);
+    this.jobId = jobId;
     this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
     this.conf = conf;
     this.metrics = metrics;
@@ -383,15 +376,9 @@ public class JobImpl implements org.apac
     this.amInfos = amInfos;
     this.userName = userName;
     this.appSubmitTime = appSubmitTime;
-    ApplicationId applicationId = applicationAttemptId.getApplicationId();
-    jobId.setAppId(applicationId);
-    jobId.setId(applicationId.getId());
-    oldJobId = TypeConverter.fromYarn(jobId);
-    LOG.info("Job created" +
-    		" appId=" + applicationId + 
-    		" jobId=" + jobId + 
-    		" oldJobId=" + oldJobId);
-    
+    this.oldJobId = TypeConverter.fromYarn(jobId);
+    this.newApiCommitter = newApiCommitter;
+
     this.taskAttemptListener = taskAttemptListener;
     this.eventHandler = eventHandler;
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -400,6 +387,7 @@ public class JobImpl implements org.apac
 
     this.fsTokens = fsTokenCredentials;
     this.jobTokenSecretManager = jobTokenSecretManager;
+    this.committer = committer;
 
     this.aclsManager = new JobACLsManager(conf);
     this.username = System.getProperty("user.name");
@@ -854,47 +842,13 @@ public class JobImpl implements org.apac
 
         checkTaskLimits();
 
-        
-        boolean newApiCommitter = false;
-        if ((job.numReduceTasks > 0 && 
-            job.conf.getBoolean("mapred.reducer.new-api", false)) ||
-              (job.numReduceTasks == 0 && 
-               job.conf.getBoolean("mapred.mapper.new-api", false)))  {
-          newApiCommitter = true;
-          LOG.info("Using mapred newApiCommitter.");
-        }
-        
-        LOG.info("OutputCommitter set in config " + job.conf.get(
-            "mapred.output.committer.class"));
-        
-        if (newApiCommitter) {
+        if (job.newApiCommitter) {
           job.jobContext = new JobContextImpl(job.conf,
               job.oldJobId);
-          org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
-          = RecordFactoryProvider.getRecordFactory(null)
-              .newRecordInstance(
-                  org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
-          attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
-              .newRecordInstance(TaskId.class));
-          attemptID.getTaskId().setJobId(job.jobId);
-          attemptID.getTaskId().setTaskType(TaskType.MAP);
-          TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf,
-              TypeConverter.fromYarn(attemptID));
-          try {
-            OutputFormat outputFormat = ReflectionUtils.newInstance(
-                taskContext.getOutputFormatClass(), job.conf);
-            job.committer = outputFormat.getOutputCommitter(taskContext);
-          } catch(Exception e) {
-            throw new IOException("Failed to assign outputcommitter", e);
-          }
         } else {
           job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
               new JobConf(job.conf), job.oldJobId);
-          job.committer = ReflectionUtils.newInstance(
-              job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class,
-              org.apache.hadoop.mapred.OutputCommitter.class), job.conf);
         }
-        LOG.info("OutputCommitter is " + job.committer.getClass().getName());
         
         long inputLength = 0;
         for (int i = 0; i < job.numMapTasks; ++i) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon Oct 24 08:44:54 2011
@@ -32,17 +32,23 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -53,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
@@ -84,9 +91,6 @@ import org.apache.hadoop.yarn.service.Se
 
 //TODO:
 //task cleanup for all non completed tasks
-//change job output committer to have 
-//    - atomic job output promotion
-//    - recover output of completed tasks
 
 public class RecoveryService extends CompositeService implements Recovery {
 
@@ -95,6 +99,7 @@ public class RecoveryService extends Com
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
 
   private final ApplicationAttemptId applicationAttemptId;
+  private final OutputCommitter committer;
   private final Dispatcher dispatcher;
   private final ControlledClock clock;
 
@@ -108,9 +113,10 @@ public class RecoveryService extends Com
   private volatile boolean recoveryMode = false;
 
   public RecoveryService(ApplicationAttemptId applicationAttemptId, 
-      Clock clock) {
+      Clock clock, OutputCommitter committer) {
     super("RecoveringDispatcher");
     this.applicationAttemptId = applicationAttemptId;
+    this.committer = committer;
     this.dispatcher = new RecoveryDispatcher();
     this.clock = new ControlledClock(clock);
       addService((Service) dispatcher);
@@ -122,17 +128,17 @@ public class RecoveryService extends Com
     // parse the history file
     try {
       parse();
-      if (completedTasks.size() > 0) {
-        recoveryMode = true;
-        LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " + 
-            "TO RECOVER " + completedTasks.size());
-        LOG.info("Job launch time " + jobInfo.getLaunchTime());
-        clock.setTime(jobInfo.getLaunchTime());
-      }
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.warn(e);
       LOG.warn("Could not parse the old history file. Aborting recovery. "
-          + "Starting afresh.");
+          + "Starting afresh.", e);
+    }
+    if (completedTasks.size() > 0) {
+      recoveryMode = true;
+      LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+          + "TO RECOVER " + completedTasks.size());
+      LOG.info("Job launch time " + jobInfo.getLaunchTime());
+      clock.setTime(jobInfo.getLaunchTime());
     }
   }
 
@@ -315,6 +321,20 @@ public class RecoveryService extends Com
         TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
         switch (state) {
         case SUCCEEDED:
+          //recover the task output
+          TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
+              attInfo.getAttemptId());
+          try {
+            committer.recoverTask(taskContext);
+          } catch (IOException e) {
+            actualHandler.handle(new JobDiagnosticsUpdateEvent(
+                aId.getTaskId().getJobId(), "Error in recovering task output " + 
+                e.getMessage()));
+            actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
+                JobEventType.INTERNAL_ERROR));
+          }
+          LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+          
           // send the done event
           LOG.info("Sending done event to " + aId);
           actualHandler.handle(new TaskAttemptEvent(aId,
@@ -334,6 +354,16 @@ public class RecoveryService extends Com
         return;
       }
 
+      else if (event.getType() == 
+        ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
+        TaskAttemptId aId = ((ContainerLauncherEvent) event)
+          .getTaskAttemptID();
+        actualHandler.handle(
+           new TaskAttemptEvent(aId,
+                TaskAttemptEventType.TA_CONTAINER_CLEANED));
+        return;
+      }
+
       // delegate to the actual handler
       actualHandler.handle(event);
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Mon Oct 24 08:44:54 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -264,9 +265,11 @@ public class MRApp extends MRAppMaster {
     } catch (IOException e) {
       throw new YarnException(e);
     }
-    Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(),
-                             getTaskAttemptListener(), getContext().getClock(),
-                             currentUser.getUserName());
+    Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
+    		getDispatcher().getEventHandler(),
+            getTaskAttemptListener(), getContext().getClock(),
+            getCommitter(), isNewApiCommitter(),
+            currentUser.getUserName());
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
     getDispatcher().register(JobFinishEvent.Type.class,
@@ -413,13 +416,15 @@ public class MRApp extends MRAppMaster {
       return localStateMachine;
     }
 
-    public TestJob(Configuration conf, ApplicationId applicationId,
-        EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
-        Clock clock, String user) {
-      super(getApplicationAttemptId(applicationId, getStartCount()), conf,
-          eventHandler, taskAttemptListener, new JobTokenSecretManager(),
-          new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics,
-          user, System.currentTimeMillis(), getAllAMInfos());
+    public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+        Configuration conf, EventHandler eventHandler,
+        TaskAttemptListener taskAttemptListener, Clock clock,
+        OutputCommitter committer, boolean newApiCommitter, String user) {
+      super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
+          conf, eventHandler, taskAttemptListener,
+          new JobTokenSecretManager(), new Credentials(), clock,
+          getCompletedTaskFromPreviousRun(), metrics, committer,
+          newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos());
 
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon Oct 24 08:44:54 2011
@@ -342,10 +342,10 @@ public class TestRMContainerAllocator {
 
     public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
         int numMaps, int numReduces) {
-      super(appAttemptID, conf, null, null, null, null, null, null, null, null,
-          System.currentTimeMillis(), null);
-      this.jobId = MRBuilderUtils
-          .newJobId(appAttemptID.getApplicationId(), 0);
+      super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0),
+          appAttemptID, conf, null, null, null, null, null, null, null, null,
+          true, null, System.currentTimeMillis(), null);
+      this.jobId = getID();
       this.numMaps = numMaps;
       this.numReduces = numReduces;
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Mon Oct 24 08:44:54 2011
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -25,10 +28,21 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@@ -37,20 +51,34 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
 public class TestRecovery {
 
   private static final Log LOG = LogFactory.getLog(TestRecovery.class);
+  private static Path outputDir = new Path(new File("target", 
+      TestRecovery.class.getName()).getAbsolutePath() + 
+      Path.SEPARATOR + "out");
+  private static String partFile = "part-r-00000";
+  private Text key1 = new Text("key1");
+  private Text key2 = new Text("key2");
+  private Text val1 = new Text("val1");
+  private Text val2 = new Text("val2");
+
 
   @Test
   public void testCrashed() throws Exception {
+
     int runCount = 0;
     long am1StartTimeEst = System.currentTimeMillis();
     MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
     Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
     Job job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
     long jobStartTime = job.getReport().getStartTime();
@@ -135,6 +163,9 @@ public class TestRecovery {
     app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
     conf = new Configuration();
     conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
     job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
@@ -201,7 +232,165 @@ public class TestRecovery {
     // TODO Add verification of additional data from jobHistory - whatever was
     // available in the failed attempt should be available here
   }
+
+  @Test
+  public void testOutputRecovery() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
   
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    //stop the app before the job completes.
+    app.stop();
+
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    reduceTask1 = it.next();
+    Task reduceTask2 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    // first reduce will be recovered, no need to send done
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
+    
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+    
+    TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+        .iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+    
+   //send the done signal to the 2nd reduce task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce2Attempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+
+  private void writeOutput(TaskAttempt attempt, Configuration conf)
+    throws Exception {
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+        TypeConverter.fromYarn(attempt.getID()));
+    
+    TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat
+        .getRecordWriter(tContext);
+    
+    NullWritable nullWritable = NullWritable.get();
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(tContext);
+    }
+    
+    OutputFormat outputFormat = ReflectionUtils.newInstance(
+        tContext.getOutputFormatClass(), conf);
+    OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+    committer.commitTask(tContext);
+  }
+
+  private void validateOutput() throws IOException {
+    File expectedFile = new File(new Path(outputDir, partFile).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+    String output = slurp(expectedFile);
+    Assert.assertEquals(output, expectedOutput.toString());
+  }
+
+  public static String slurp(File f) throws IOException {
+    int len = (int) f.length();
+    byte[] buf = new byte[len];
+    FileInputStream in = new FileInputStream(f);
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+
   class MRAppWithHistory extends MRApp {
     public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
         String testName, boolean cleanOnStart, int startCount) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Oct 24 08:44:54 2011
@@ -449,6 +449,8 @@ public interface MRJobConfig {
   public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
   public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
 
+  public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
+
   // This should be the directory where splits file gets localized on the node
   // running ApplicationMaster.
   public static final String JOB_SUBMIT_DIR = "jobSubmitDir";

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Mon Oct 24 08:44:54 2011
@@ -25,7 +25,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
@@ -72,6 +71,7 @@ class EventWriter {
   void flush() throws IOException {
     encoder.flush();
     out.flush();
+    out.hflush();
   }
 
   void close() throws IOException {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java Mon Oct 24 08:44:54 2011
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.lib.output;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -56,6 +58,17 @@ public class NullOutputFormat<K, V> exte
       }
       public void setupJob(JobContext jobContext) { }
       public void setupTask(TaskAttemptContext taskContext) { }
+
+      @Override
+      public boolean isRecoverySupported() {
+        return true;
+      }
+
+      @Override
+      public void recoverTask(TaskAttemptContext taskContext)
+          throws IOException {
+        // Nothing to do for recovering the task.
+      }
     };
   }
 }