You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/04/26 10:01:05 UTC

svn commit: r1096692 - in /hadoop/mapreduce/branches/MR-279/mr-client: hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-clie...

Author: sharad
Date: Tue Apr 26 08:01:04 2011
New Revision: 1096692

URL: http://svn.apache.org/viewvc?rev=1096692&view=rev
Log:
Recovery of MR Application Master from failures.

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
Modified:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Apr 26 08:01:04 2011
@@ -56,6 +56,7 @@ public class JobHistoryEventHandler exte
     implements EventHandler<JobHistoryEvent> {
 
   private final AppContext context;
+  private final int startCount;
 
   private FileContext logDirFc; // log Dir FileContext
   private FileContext doneDirFc; // done Dir FileContext
@@ -80,9 +81,10 @@ public class JobHistoryEventHandler exte
   public static final FsPermission HISTORY_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0740); // rwxr-----
 
-  public JobHistoryEventHandler(AppContext context) {
+  public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
     this.context = context;
+    this.startCount = startCount;
   }
 
   @Override
@@ -344,8 +346,10 @@ public class JobHistoryEventHandler exte
   /**
    * Get the job history file path
    */
-  public static Path getJobHistoryFile(Path dir, JobId jobId) {
-    return new Path(dir, TypeConverter.fromYarn(jobId).toString());
+  private Path getJobHistoryFile(Path dir, JobId jobId) {
+    return new Path(dir, TypeConverter.fromYarn(jobId).toString() + "_" + 
+        startCount);
+
   }
 
 /*

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Apr 26 08:01:04 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -56,6 +58,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
+import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
@@ -102,9 +106,10 @@ public class MRAppMaster extends Composi
 
   private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
 
-  private final Clock clock;
-
-  private ApplicationId appID;
+  private Clock clock;
+  private final int startCount;
+  private final ApplicationId appID;
+  private Set<TaskId> completedTasksFromPreviousRun;
   private AppContext context;
   private Dispatcher dispatcher;
   private ClientService clientService;
@@ -117,21 +122,16 @@ public class MRAppMaster extends Composi
       new JobTokenSecretManager();
 
   private Job job;
-  private int failCount = 0;
-  
-  public MRAppMaster(ApplicationId applicationId) {
-    this(applicationId, new SystemClock());
-  }
   
-  public MRAppMaster(ApplicationId applicationId, int failCount) {
-    this(applicationId);
-    this.failCount = failCount;
+  public MRAppMaster(ApplicationId applicationId, int startCount) {
+    this(applicationId, new SystemClock(), startCount);
   }
-  
-  public MRAppMaster(ApplicationId applicationId, Clock clock) {
+
+  public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) {
     super(MRAppMaster.class.getName());
     this.clock = clock;
     this.appID = applicationId;
+    this.startCount = startCount;
     LOG.info("Created MRAppMaster for application " + applicationId);
   }
 
@@ -139,8 +139,18 @@ public class MRAppMaster extends Composi
   public void init(final Configuration conf) {
     context = new RunningAppContext(); 
 
-    dispatcher = new AsyncDispatcher();
-    addIfService(dispatcher);
+     if (conf.getBoolean(YarnMRJobConfig.RECOVERY_ENABLE, false) 
+         && startCount > 1) {
+      LOG.info("Recovery is enabled. Will try to recover from previous life.");
+      Recovery recoveryServ = new RecoveryService(appID, clock, startCount);
+      addIfService(recoveryServ);
+      dispatcher = recoveryServ.getDispatcher();
+      clock = recoveryServ.getClock();
+      completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
+    } else {
+      dispatcher = new AsyncDispatcher();
+      addIfService(dispatcher);
+    }
 
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context);
@@ -259,7 +269,7 @@ public class MRAppMaster extends Composi
     // create single job
     Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
                       taskAttemptListener, jobTokenSecretManager, fsTokens, 
-                      clock);
+                      clock, startCount, completedTasksFromPreviousRun);
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
     dispatcher.register(JobFinishEvent.Type.class,
@@ -312,7 +322,8 @@ public class MRAppMaster extends Composi
 
   protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
       AppContext context) {
-    JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context);
+    JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, 
+        getStartCount());
     return eventHandler;
   }
 
@@ -385,6 +396,10 @@ public class MRAppMaster extends Composi
     return appID;
   }
 
+  public int getStartCount() {
+    return startCount;
+  }
+
   public AppContext getContext() {
     return context;
   }
@@ -393,6 +408,10 @@ public class MRAppMaster extends Composi
     return dispatcher;
   }
 
+  public Set<TaskId> getCompletedTaskFromPreviousRun() {
+    return completedTasksFromPreviousRun;
+  }
+
   //Returns null if speculation is not enabled
   public Speculator getSpeculator() {
     return speculator;
@@ -503,7 +522,7 @@ public class MRAppMaster extends Composi
       applicationId.setClusterTimestamp(Long.valueOf(args[0]));
       applicationId.setId(Integer.valueOf(args[1]));
       int failCount = Integer.valueOf(args[2]);
-      MRAppMaster appMaster = new MRAppMaster(applicationId, failCount);
+      MRAppMaster appMaster = new MRAppMaster(applicationId, ++failCount);
       YarnConfiguration conf = new YarnConfiguration(new JobConf());
       conf.addResource(new Path(YARNApplicationConstants.JOB_CONF_FILE));
       conf.set(MRJobConfig.USER_NAME, 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Apr 26 08:01:04 2011
@@ -127,6 +127,8 @@ public class JobImpl implements org.apac
   
   //final fields
   private final Clock clock;
+  private final int startCount;
+  private final Set<TaskId> completedTasksFromPreviousRun;
   private final Lock readLock;
   private final Lock writeLock;
   private final JobId jobId;
@@ -341,11 +343,14 @@ public class JobImpl implements org.apac
   public JobImpl(ApplicationId appID, Configuration conf,
       EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
-      Credentials fsTokenCredentials, Clock clock) {
+      Credentials fsTokenCredentials, Clock clock, int startCount, 
+      Set<TaskId> completedTasksFromPreviousRun) {
 
     this.jobId = recordFactory.newRecordInstance(JobId.class);
     this.conf = conf;
     this.clock = clock;
+    this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
+    this.startCount = startCount;
     jobId.setAppId(appID);
     jobId.setId(appID.getId());
     oldJobId = TypeConverter.fromYarn(jobId);
@@ -900,7 +905,7 @@ public class JobImpl implements org.apac
                 job.conf, splits[i], 
                 job.taskAttemptListener, 
                 job.committer, job.jobToken, job.fsTokens.getAllTokens(), 
-                job.clock);
+                job.clock, job.completedTasksFromPreviousRun, job.startCount);
         job.addTask(task);
       }
       LOG.info("Input size for job " + job.jobId + " = " + inputLength
@@ -915,7 +920,8 @@ public class JobImpl implements org.apac
                 job.remoteJobConfFile, 
                 job.conf, job.numMapTasks, 
                 job.taskAttemptListener, job.committer, job.jobToken,
-                job.fsTokens.getAllTokens(), job.clock);
+                job.fsTokens.getAllTokens(), job.clock, 
+                job.completedTasksFromPreviousRun, job.startCount);
         job.addTask(task);
       }
       LOG.info("Number of reduces for job " + job.jobId + " = "

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Tue Apr 26 08:01:04 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.security.token.Token;
@@ -46,9 +48,11 @@ public class MapTaskImpl extends TaskImp
       TaskSplitMetaInfo taskSplitMetaInfo,
       TaskAttemptListener taskAttemptListener, OutputCommitter committer,
       Token<JobTokenIdentifier> jobToken,
-      Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
+      Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, 
+      Set<TaskId> completedTasksFromPreviousRun, int startCount) {
     super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
-        conf, taskAttemptListener, committer, jobToken, fsTokens, clock);
+        conf, taskAttemptListener, committer, jobToken, fsTokens, clock, 
+        completedTasksFromPreviousRun, startCount);
     this.taskSplitMetaInfo = taskSplitMetaInfo;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Tue Apr 26 08:01:04 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import java.util.Collection;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -27,6 +28,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.security.token.Token;
@@ -42,9 +44,11 @@ public class ReduceTaskImpl extends Task
       EventHandler eventHandler, Path jobFile, Configuration conf,
       int numMapTasks, TaskAttemptListener taskAttemptListener,
       OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
-      Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
+      Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, 
+      Set<TaskId> completedTasksFromPreviousRun, int startCount) {
     super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
-        taskAttemptListener, committer, jobToken, fsTokens, clock);
+        taskAttemptListener, committer, jobToken, fsTokens, clock,
+        completedTasksFromPreviousRun, startCount);
     this.numMapTasks = numMapTasks;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Apr 26 08:01:04 2011
@@ -989,30 +989,7 @@ public abstract class TaskAttemptImpl im
       String taskType = 
           TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
       LOG.info("In TaskAttemptImpl taskType: " + taskType);
-      if (taskType.equals("MAP")) {
-          MapAttemptFinishedEvent mfe =
-            new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-            TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-            TaskAttemptState.SUCCEEDED.toString(),
-            taskAttempt.finishTime,
-            taskAttempt.finishTime, "hostname",
-            TaskAttemptState.SUCCEEDED.toString(),
-            TypeConverter.fromYarn(taskAttempt.getCounters()),null);
-            taskAttempt.eventHandler.handle(
-              new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe));
-      } else {
-          ReduceAttemptFinishedEvent rfe =
-            new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-            TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-            TaskAttemptState.SUCCEEDED.toString(),
-            taskAttempt.finishTime,
-            taskAttempt.finishTime,
-            taskAttempt.finishTime, "hostname",
-            TaskAttemptState.SUCCEEDED.toString(),
-            TypeConverter.fromYarn(taskAttempt.getCounters()),null);
-            taskAttempt.eventHandler.handle(
-              new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe));
-      }
+      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
           /*
       TaskAttemptFinishedEvent tfe =
           new TaskAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
@@ -1047,36 +1024,40 @@ public abstract class TaskAttemptImpl im
           taskAttempt.reportedStatus.diagnosticInfo.toString());
       taskAttempt.eventHandler.handle(
           new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), ta));
-      if (taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP) {
-        MapAttemptFinishedEvent mfe =
-           new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-           TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-           TaskAttemptState.FAILED.toString(),
-           taskAttempt.finishTime,
-           taskAttempt.finishTime, "hostname",
-           TaskAttemptState.FAILED.toString(),
-           TypeConverter.fromYarn(taskAttempt.getCounters()),null);
-           taskAttempt.eventHandler.handle(
-             new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe));
-      } else {
-         ReduceAttemptFinishedEvent rfe =
-           new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-           TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-           TaskAttemptState.FAILED.toString(),
-           taskAttempt.finishTime,
-           taskAttempt.finishTime,
-           taskAttempt.finishTime, "hostname",
-           TaskAttemptState.FAILED.toString(),
-           TypeConverter.fromYarn(taskAttempt.getCounters()),null);
-           taskAttempt.eventHandler.handle(
-             new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe));
-      }
+      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED);
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_FAILED));
     }
   }
 
+  private void logAttemptFinishedEvent(TaskAttemptState state) {
+    if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
+      MapAttemptFinishedEvent mfe =
+         new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+         TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+         state.toString(),
+         finishTime,
+         finishTime, "hostname",
+         state.toString(),
+         TypeConverter.fromYarn(getCounters()),null);
+         eventHandler.handle(
+           new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
+    } else {
+       ReduceAttemptFinishedEvent rfe =
+         new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+         TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+         state.toString(),
+         finishTime,
+         finishTime,
+         finishTime, "hostname",
+         state.toString(),
+         TypeConverter.fromYarn(getCounters()),null);
+         eventHandler.handle(
+           new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
+    }
+  }
+
   private static class TooManyFetchFailureTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @Override
@@ -1108,6 +1089,7 @@ public abstract class TaskAttemptImpl im
           taskAttempt.reportedStatus.diagnosticInfo.toString());
       taskAttempt.eventHandler.handle(
           new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tke));
+      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED);
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_KILLED));

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Apr 26 08:01:04 2011
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -219,7 +220,8 @@ public abstract class TaskImpl implement
       EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
       TaskAttemptListener taskAttemptListener, OutputCommitter committer,
       Token<JobTokenIdentifier> jobToken,
-      Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
+      Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
+      Set<TaskId> completedTasksFromPreviousRun, int startCount) {
     this.conf = conf;
     this.clock = clock;
     this.jobFile = remoteJobConfFile;
@@ -242,6 +244,18 @@ public abstract class TaskImpl implement
     this.fsTokens = fsTokens;
     this.jobToken = jobToken;
 
+    if (completedTasksFromPreviousRun != null 
+        && completedTasksFromPreviousRun.contains(taskId)) {
+      LOG.info("Task is from previous run " + taskId);
+      startCount = startCount - 1;
+    }
+
+    //attempt ids are generated based on MR app startCount so that attempts
+    //from previous lives don't overstep the current one.
+    //this assumes that a task won't have more than 1000 attempts in its single 
+    //life
+    nextAttemptNumber = (startCount - 1) * 1000;
+
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java?rev=1096692&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java Tue Apr 26 08:01:04 2011
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.v2.app.recover;
+
+import org.apache.hadoop.yarn.Clock;
+
+class ControlledClock implements Clock {
+  private long time = -1;
+  private final Clock actualClock;
+  ControlledClock(Clock actualClock) {
+    this.actualClock = actualClock;
+  }
+  synchronized void setTime(long time) {
+    this.time = time;
+  }
+  synchronized void reset() {
+    time = -1;
+  }
+
+  @Override
+  public synchronized long getTime() {
+    if (time != -1) {
+      return time;
+    }
+    return actualClock.getTime();
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java?rev=1096692&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java Tue Apr 26 08:01:04 2011
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.recover;
+
+import java.util.Set;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.Dispatcher;
+
+public interface Recovery {
+
+  Dispatcher getDispatcher();
+
+  Clock getClock();
+  
+  Set<TaskId> getCompletedTasks();
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1096692&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Apr 26 08:01:04 2011
@@ -0,0 +1,363 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.recover;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+
+/*
+ * Recovers the completed tasks from the previous life of Application Master.
+ * The completed tasks are deciphered from the history file of the previous life.
+ * Recovery service intercepts and replay the events for completed tasks.
+ * While recovery is in progress, the scheduling of new tasks are delayed by 
+ * buffering the task schedule events.
+ * The recovery service controls the clock while recovery is in progress.
+ */
+
+//TODO:
+//task cleanup for all non completed tasks
+//change job output committer to have 
+//    - atomic job output promotion
+//    - recover output of completed tasks
+
+public class RecoveryService extends CompositeService implements Recovery {
+
+  private static final Log LOG = LogFactory.getLog(RecoveryService.class);
+
+  private final ApplicationId appID;
+  private final Dispatcher dispatcher;
+  private final ControlledClock clock;
+  private final int startCount;
+
+  private JobInfo jobInfo = null;
+  private final Map<TaskId, TaskInfo> completedTasks =
+    new HashMap<TaskId, TaskInfo>();
+
+  private final List<TaskEvent> pendingTaskScheduleEvents =
+    new ArrayList<TaskEvent>();
+
+  private volatile boolean recoveryMode = false;
+
+  public RecoveryService(ApplicationId appID, Clock clock, int startCount) {
+    super("RecoveringDispatcher");
+    this.appID = appID;
+    this.startCount = startCount;
+    this.dispatcher = new RecoveryDispatcher();
+    this.clock = new ControlledClock(clock);
+    if (dispatcher instanceof Service) {
+      addService((Service) dispatcher);
+    }
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    // parse the history file
+    try {
+      parse();
+      if (completedTasks.size() > 0) {
+        recoveryMode = true;
+        LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " + 
+            "TO RECOVER " + completedTasks.size());
+        LOG.info("Job launch time " + jobInfo.getLaunchTime());
+        clock.setTime(jobInfo.getLaunchTime());
+      }
+    } catch (IOException e) {
+      LOG.warn(e);
+      LOG.warn("Could not parse the old history file. Aborting recovery. "
+          + "Starting afresh.");
+    }
+  }
+
+  @Override
+  public Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  @Override
+  public Clock getClock() {
+    return clock;
+  }
+
+  @Override
+  public Set<TaskId> getCompletedTasks() {
+    return completedTasks.keySet();
+  }
+
+  private void parse() throws IOException {
+    // TODO: parse history file based on startCount
+    String jobName = TypeConverter.fromYarn(appID).toString();
+    String defaultStagingDir = getConfig().get(
+        YARNApplicationConstants.APPS_STAGING_DIR_KEY)
+        + "/history/staging";
+    String jobhistoryDir = getConfig().get(
+        YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir);
+    FSDataInputStream in = null;
+    Path historyFile = null;
+    Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
+        new Path(jobhistoryDir));
+    FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
+        getConfig());
+    historyFile = fc.makeQualified(new Path(histDirPath, jobName + "_" + 
+        (startCount -1))); //read the previous history file
+    in = fc.open(historyFile);
+    JobHistoryParser parser = new JobHistoryParser(in);
+    jobInfo = parser.parse();
+    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+        .getAllTasks();
+    for (TaskInfo taskInfo : taskInfos.values()) {
+      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        completedTasks
+            .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+        LOG.info("Read from history task "
+            + TypeConverter.toYarn(taskInfo.getTaskId()));
+      }
+    }
+    LOG.info("Read completed tasks from history "
+        + completedTasks.size());
+  }
+
+  class RecoveryDispatcher extends AsyncDispatcher {
+    private final EventHandler actualHandler;
+    private final EventHandler handler;
+
+    RecoveryDispatcher() {
+      actualHandler = super.getEventHandler();
+      handler = new InterceptingEventHandler(actualHandler);
+    }
+
+    @Override
+    public void dispatch(Event event) {
+      if (recoveryMode) {
+        if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
+          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+              .getTaskAttemptID());
+          LOG.info("Attempt start time " + attInfo.getStartTime());
+          clock.setTime(attInfo.getStartTime());
+
+        } else if (event.getType() == TaskAttemptEventType.TA_DONE
+            || event.getType() == TaskAttemptEventType.TA_FAILMSG
+            || event.getType() == TaskAttemptEventType.TA_KILL) {
+          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+              .getTaskAttemptID());
+          LOG.info("Attempt finish time " + attInfo.getFinishTime());
+          clock.setTime(attInfo.getFinishTime());
+        }
+
+        else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
+            || event.getType() == TaskEventType.T_ATTEMPT_KILLED
+            || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
+          TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
+          LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
+          TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
+              .getTaskId());
+          taskInfo.getAllTaskAttempts().remove(
+              TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
+          // remove the task info from completed tasks if all attempts are
+          // recovered
+          if (taskInfo.getAllTaskAttempts().size() == 0) {
+            completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
+            // checkForRecoveryComplete
+            LOG.info("CompletedTasks() " + completedTasks.size());
+            if (completedTasks.size() == 0) {
+              recoveryMode = false;
+              clock.reset();
+              LOG.info("Setting the recovery mode to false. " +
+                 "Recovery is complete!");
+
+              // send all pending tasks schedule events
+              for (TaskEvent tEv : pendingTaskScheduleEvents) {
+                actualHandler.handle(tEv);
+              }
+
+            }
+          }
+        }
+      }
+      super.dispatch(event);
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return handler;
+    }
+  }
+
+  private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
+    TaskInfo taskInfo = completedTasks.get(id.getTaskId());
+    return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
+  }
+
+  private class InterceptingEventHandler implements EventHandler {
+    EventHandler actualHandler;
+
+    InterceptingEventHandler(EventHandler actualHandler) {
+      this.actualHandler = actualHandler;
+    }
+
+    @Override
+    public void handle(Event event) {
+      if (!recoveryMode) {
+        // delegate to the dispatcher one
+        actualHandler.handle(event);
+        return;
+      }
+
+      else if (event.getType() == TaskEventType.T_SCHEDULE) {
+        TaskEvent taskEvent = (TaskEvent) event;
+        // delay the scheduling of new tasks till previous ones are recovered
+        if (completedTasks.get(taskEvent.getTaskID()) == null) {
+          LOG.debug("Adding to pending task events "
+              + taskEvent.getTaskID());
+          pendingTaskScheduleEvents.add(taskEvent);
+          return;
+        }
+      }
+
+      else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+        TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID();
+        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+        LOG.debug("CONTAINER_REQ " + aId);
+        sendAssignedEvent(aId, attInfo);
+        return;
+      }
+
+      else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
+        TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
+        LOG.debug("TASK_CLEAN");
+        actualHandler.handle(new TaskAttemptEvent(aId,
+            TaskAttemptEventType.TA_CLEANUP_DONE));
+        return;
+      }
+
+      else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
+        TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
+            .getTaskAttemptID();
+        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+        actualHandler.handle(new TaskAttemptEvent(aId,
+            TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+        // send the status update event
+        sendStatusUpdateEvent(aId, attInfo);
+
+        TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getState());
+        switch (state) {
+        case SUCCEEDED:
+          // send the done event
+          LOG.info("Sending done event to " + aId);
+          actualHandler.handle(new TaskAttemptEvent(aId,
+              TaskAttemptEventType.TA_DONE));
+          break;
+        case KILLED:
+          LOG.info("Sending kill event to " + aId);
+          actualHandler.handle(new TaskAttemptEvent(aId,
+              TaskAttemptEventType.TA_KILL));
+          break;
+        default:
+          LOG.info("Sending fail event to " + aId);
+          actualHandler.handle(new TaskAttemptEvent(aId,
+              TaskAttemptEventType.TA_FAILMSG));
+          break;
+        }
+        return;
+      }
+
+      // delegate to the actual handler
+      actualHandler.handle(event);
+    }
+
+    private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
+        TaskAttemptInfo attemptInfo) {
+      LOG.info("Sending status update event to " + yarnAttemptID);
+      TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+      taskAttemptStatus.id = yarnAttemptID;
+      taskAttemptStatus.progress = 1.0f;
+      taskAttemptStatus.diagnosticInfo = "";
+      taskAttemptStatus.stateString = attemptInfo.getState();
+      // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
+      taskAttemptStatus.phase = Phase.CLEANUP;
+      org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
+      if (cntrs == null) {
+        taskAttemptStatus.counters = null;
+      } else {
+        taskAttemptStatus.counters = TypeConverter.toYarn(attemptInfo
+            .getCounters());
+      }
+      actualHandler.handle(new TaskAttemptStatusUpdateEvent(
+          taskAttemptStatus.id, taskAttemptStatus));
+    }
+
+    private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
+        TaskAttemptInfo attemptInfo) {
+      LOG.info("Sending assigned event to " + yarnAttemptID);
+      ContainerId cId = RecordFactoryProvider.getRecordFactory(null)
+          .newRecordInstance(ContainerId.class);
+      actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
+          cId, null, attemptInfo.getHostname() + ":" + 
+          attemptInfo.getHttpPort(), null));
+    }
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Apr 26 08:01:04 2011
@@ -85,10 +85,20 @@ public class MRApp extends MRAppMaster {
   //if true, tasks complete automatically as soon as they are launched
   protected boolean autoComplete = false;
 
+  static ApplicationId applicationId;
+
+  static {
+    applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+    applicationId.setClusterTimestamp(0);
+    applicationId.setId(0);
+  }
+
   public MRApp(int maps, int reduces, boolean autoComplete) {
-    
-    super(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
-        ApplicationId.class));
+    this(maps, reduces, autoComplete, 1);
+  }
+
+  public MRApp(int maps, int reduces, boolean autoComplete, int startCount) {
+    super(applicationId, startCount);
     this.maps = maps;
     this.reduces = reduces;
     this.autoComplete = autoComplete;
@@ -163,10 +173,14 @@ public class MRApp extends MRAppMaster {
       JobReport jobReport = job.getReport();
       Assert.assertTrue("Job start time is not less than finish time",
           jobReport.getStartTime() < jobReport.getFinishTime());
+      System.out.println("Job start time :" + jobReport.getStartTime());
+      System.out.println("Job finish time :" + jobReport.getFinishTime());
       Assert.assertTrue("Job finish time is in future",
           jobReport.getFinishTime() < System.currentTimeMillis());
       for (Task task : job.getTasks().values()) {
         TaskReport taskReport = task.getReport();
+        System.out.println("Task start time : " + taskReport.getStartTime());
+        System.out.println("Task finish time : " + taskReport.getFinishTime());
         Assert.assertTrue("Task start time is not less than finish time",
             taskReport.getStartTime() < taskReport.getFinishTime());
         for (TaskAttempt attempt : task.getAttempts().values()) {
@@ -310,7 +324,8 @@ public class MRApp extends MRAppMaster {
     public TestJob(ApplicationId appID, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock) {
       super(appID, new Configuration(), eventHandler, taskAttemptListener,
-          new JobTokenSecretManager(), new Credentials(), clock);
+          new JobTokenSecretManager(), new Credentials(), clock, getStartCount(), 
+          getCompletedTaskFromPreviousRun());
 
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1096692&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Apr 26 08:01:04 2011
@@ -0,0 +1,185 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.junit.Test;
+
+public class TestRecovery {
+
+  private static final Log LOG = LogFactory.getLog(TestRecovery.class);
+
+  @Test
+  public void testCrashed() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRApp(2, 1, false, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    long jobStartTime = job.getReport().getStartTime();
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.NEW, reduceTask.getReport().getTaskState());
+    
+  //send the fail signal to the 1st map task attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_FAILMSG));
+    
+    app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
+    
+    while (mapTask1.getAttempts().size() != 2) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    TaskAttempt task1Attempt2 = itr.next();
+    
+    app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
+
+    //send the kill signal to the 1st map 2nd attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt2.getID(),
+            TaskAttemptEventType.TA_KILL));
+    
+    app.waitForState(task1Attempt2, TaskAttemptState.KILLED);
+    
+    while (mapTask1.getAttempts().size() != 3) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    itr.next();
+    TaskAttempt task1Attempt3 = itr.next();
+    
+    app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+
+    //send the done signal to the 1st map 3rd attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt3.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    long task1StartTime = mapTask1.getReport().getStartTime();
+    long task1FinishTime = mapTask1.getReport().getFinishTime();
+    
+    //stop the app
+    app.stop();
+    
+    //rerun
+    //in rerun the 1st map will be recovered from previous run
+    app = new MRApp(2, 1, false, ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(YarnMRJobConfig.RECOVERY_ENABLE, true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+    
+    // first map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+  //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+    
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    Assert.assertEquals("Job Start time not correct",
+        jobStartTime, job.getReport().getStartTime());
+    Assert.assertEquals("Task Start time not correct",
+        task1StartTime, mapTask1.getReport().getStartTime());
+    Assert.assertEquals("Task Finish time not correct",
+        task1FinishTime, mapTask1.getReport().getFinishTime());
+  }
+
+  public static void main(String[] arg) throws Exception {
+    TestRecovery test = new TestRecovery();
+    test.testCrashed();
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Tue Apr 26 08:01:04 2011
@@ -53,4 +53,7 @@ public class YarnMRJobConfig {
       "address.webapp";
   public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
 	  "0.0.0.0:19888";
+
+  public static final String RECOVERY_ENABLE
+      = "yarn.mapreduce.job.recovery.enable";
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Tue Apr 26 08:01:04 2011
@@ -68,6 +68,7 @@ class EventWriter {
   
   void flush() throws IOException { 
     encoder.flush();
+    out.hflush();
   }
 
   void close() throws IOException {