You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/14 01:08:14 UTC

svn commit: r1243753 - in /hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...

Author: vinodkv
Date: Tue Feb 14 00:08:13 2012
New Revision: 1243753

URL: http://svn.apache.org/viewvc?rev=1243753&view=rev
Log:
MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then the recovery. (vinodkv)
svn merge --ignore-ancestry -c 1243752 ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt Tue Feb 14 00:08:13 2012
@@ -741,6 +741,9 @@ Release 0.23.1 - 2012-02-08 
     MAPREDUCE-3843. Job summary log file found missing on the RM host 
     (Anupam Seth via tgraves)
 
+    MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then
+    the recovery. (vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Feb 14 00:08:13 2012
@@ -244,7 +244,7 @@ public class JobHistoryEventHandler exte
         while (!stopped && !Thread.currentThread().isInterrupted()) {
 
           // Log the size of the history-event-queue every so often.
-          if (eventCounter % 1000 == 0) {
+          if (eventCounter != 0 && eventCounter % 1000 == 0) {
             eventCounter = 0;
             LOG.info("Size of the JobHistory event queue is "
                 + eventQueue.size());
@@ -464,8 +464,10 @@ public class JobHistoryEventHandler exte
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
-        LOG.info("In HistoryEventHandler "
-            + event.getHistoryEvent().getEventType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("In HistoryEventHandler "
+              + event.getHistoryEvent().getEventType());
+        }
       } catch (IOException e) {
         LOG.error("Error writing History Event: " + event.getHistoryEvent(),
             e);

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Feb 14 00:08:13 2012
@@ -26,7 +26,6 @@ import java.security.PrivilegedException
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.TypeC
 import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@@ -123,7 +123,7 @@ import org.apache.hadoop.yarn.util.Conve
  * The information is shared across different components using AppContext.
  */
 
-@SuppressWarnings("deprecation")
+@SuppressWarnings("rawtypes")
 public class MRAppMaster extends CompositeService {
 
   private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
@@ -138,7 +138,7 @@ public class MRAppMaster extends Composi
   private final int nmPort;
   private final int nmHttpPort;
   protected final MRAppMetrics metrics;
-  private Set<TaskId> completedTasksFromPreviousRun;
+  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private List<AMInfo> amInfos;
   private AppContext context;
   private Dispatcher dispatcher;
@@ -596,7 +596,7 @@ public class MRAppMaster extends Composi
     return dispatcher;
   }
 
-  public Set<TaskId> getCompletedTaskFromPreviousRun() {
+  public Map<TaskId, TaskInfo> getCompletedTaskFromPreviousRun() {
     return completedTasksFromPreviousRun;
   }
 
@@ -737,7 +737,6 @@ public class MRAppMaster extends Composi
       return jobs;
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     public EventHandler getEventHandler() {
       return dispatcher.getEventHandler();

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Feb 14 00:08:13 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
@@ -133,7 +134,7 @@ public class JobImpl implements org.apac
   private float cleanupWeight = 0.05f;
   private float mapWeight = 0.0f;
   private float reduceWeight = 0.0f;
-  private final Set<TaskId> completedTasksFromPreviousRun;
+  private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private final List<AMInfo> amInfos;
   private final Lock readLock;
   private final Lock writeLock;
@@ -376,7 +377,7 @@ public class JobImpl implements org.apac
       TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
       Credentials fsTokenCredentials, Clock clock,
-      Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
+      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
       OutputCommitter committer, boolean newApiCommitter, String userName,
       long appSubmitTime, List<AMInfo> amInfos) {
     this.applicationAttemptId = applicationAttemptId;

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Tue Feb 14 00:08:13 2012
@@ -19,13 +19,14 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import java.util.Collection;
-import java.util.Set;
+import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -38,7 +39,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings({ "rawtypes" })
 public class MapTaskImpl extends TaskImpl {
 
   private final TaskSplitMetaInfo taskSplitMetaInfo;
@@ -49,7 +50,7 @@ public class MapTaskImpl extends TaskImp
       TaskAttemptListener taskAttemptListener, OutputCommitter committer,
       Token<JobTokenIdentifier> jobToken,
       Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, 
-      Set<TaskId> completedTasksFromPreviousRun, int startCount,
+      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
       MRAppMetrics metrics) {
     super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
         conf, taskAttemptListener, committer, jobToken, fsTokens, clock, 

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Tue Feb 14 00:08:13 2012
@@ -19,13 +19,14 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import java.util.Collection;
-import java.util.Set;
+import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@@ -37,7 +38,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings({ "rawtypes" })
 public class ReduceTaskImpl extends TaskImpl {
   
   private final int numMapTasks;
@@ -47,7 +48,7 @@ public class ReduceTaskImpl extends Task
       int numMapTasks, TaskAttemptListener taskAttemptListener,
       OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
       Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, 
-      Set<TaskId> completedTasksFromPreviousRun, int startCount,
+      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
       MRAppMetrics metrics) {
     super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
         taskAttemptListener, committer, jobToken, fsTokens, clock,

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Feb 14 00:08:13 2012
@@ -18,13 +18,14 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -35,8 +36,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
@@ -66,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.Clock;
@@ -208,8 +213,23 @@ public abstract class TaskImpl implement
 
   private final StateMachine<TaskState, TaskEventType, TaskEvent>
     stateMachine;
-  
-  protected int nextAttemptNumber;
+
+  // By default, the next TaskAttempt number is zero. Changes during recovery  
+  protected int nextAttemptNumber = 0;
+  private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
+      new ArrayList<TaskAttemptInfo>();
+
+  private static final class RecoverdAttemptsComparator implements
+      Comparator<TaskAttemptInfo> {
+    @Override
+    public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
+      long diff = attempt1.getStartTime() - attempt2.getStartTime();
+      return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+    }
+  }
+
+  private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
+      new RecoverdAttemptsComparator();
 
   //should be set to one which comes first
   //saying COMMIT_PENDING
@@ -230,7 +250,7 @@ public abstract class TaskImpl implement
       TaskAttemptListener taskAttemptListener, OutputCommitter committer,
       Token<JobTokenIdentifier> jobToken,
       Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
-      Set<TaskId> completedTasksFromPreviousRun, int startCount,
+      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
       MRAppMetrics metrics) {
     this.conf = conf;
     this.clock = clock;
@@ -243,10 +263,7 @@ public abstract class TaskImpl implement
     //  have a convention that none of the overrides depends on any
     //  fields that need initialization.
     maxAttempts = getMaxAttempts();
-    taskId = recordFactory.newRecordInstance(TaskId.class);
-    taskId.setJobId(jobId);
-    taskId.setId(partition);
-    taskId.setTaskType(taskType);
+    taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType);
     this.partition = partition;
     this.taskAttemptListener = taskAttemptListener;
     this.eventHandler = eventHandler;
@@ -255,18 +272,38 @@ public abstract class TaskImpl implement
     this.jobToken = jobToken;
     this.metrics = metrics;
 
+    // See if this is from a previous generation.
     if (completedTasksFromPreviousRun != null
-        && completedTasksFromPreviousRun.contains(taskId)) {
+        && completedTasksFromPreviousRun.containsKey(taskId)) {
+      // This task has TaskAttempts from previous generation. We have to replay
+      // them.
       LOG.info("Task is from previous run " + taskId);
-      startCount = startCount - 1;
+      TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
+      Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
+          taskInfo.getAllTaskAttempts();
+      taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
+      taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
+      Collections.sort(taskAttemptsFromPreviousGeneration,
+        RECOVERED_ATTEMPTS_COMPARATOR);
+    }
+
+    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
+      // All the previous attempts are exhausted, now start with a new
+      // generation.
+
+      // All the new TaskAttemptIDs are generated based on MR
+      // ApplicationAttemptID so that attempts from previous lives don't
+      // over-step the current one. This assumes that a task won't have more
+      // than 1000 attempts in its single generation, which is very reasonable.
+      // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
+      // and requires serious medical attention.
+      nextAttemptNumber = (startCount - 1) * 1000;
+    } else {
+      // There are still some TaskAttempts from previous generation, use them
+      nextAttemptNumber =
+          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
     }
 
-    //attempt ids are generated based on MR app startCount so that attempts
-    //from previous lives don't overstep the current one.
-    //this assumes that a task won't have more than 1000 attempts in its single 
-    //life
-    nextAttemptNumber = (startCount - 1) * 1000;
-
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -390,17 +427,23 @@ public abstract class TaskImpl implement
 
   //this is always called in read/write lock
   private long getLaunchTime() {
-    long launchTime = 0;
+    long taskLaunchTime = 0;
+    boolean launchTimeSet = false;
     for (TaskAttempt at : attempts.values()) {
-      //select the least launch time of all attempts
-      if (launchTime == 0  || launchTime > at.getLaunchTime()) {
-        launchTime = at.getLaunchTime();
+      // select the least launch time of all attempts
+      long attemptLaunchTime = at.getLaunchTime();
+      if (attemptLaunchTime != 0 && !launchTimeSet) {
+        // For the first non-zero launch time
+        launchTimeSet = true;
+        taskLaunchTime = attemptLaunchTime;
+      } else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) {
+        taskLaunchTime = attemptLaunchTime;
       }
     }
-    if (launchTime == 0) {
+    if (!launchTimeSet) {
       return this.scheduledTime;
     }
-    return launchTime;
+    return taskLaunchTime;
   }
 
   //this is always called in read/write lock
@@ -525,7 +568,16 @@ public abstract class TaskImpl implement
         attempts.put(attempt.getID(), attempt);
         break;
     }
-    ++nextAttemptNumber;
+
+    // Update nextATtemptNumber
+    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
+      ++nextAttemptNumber;
+    } else {
+      // There are still some TaskAttempts from previous generation, use them
+      nextAttemptNumber =
+          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
+    }
+
     ++numberUncompletedAttempts;
     //schedule the nextAttemptNumber
     if (failedAttempts > 0) {

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java Tue Feb 14 00:08:13 2012
@@ -19,8 +19,9 @@
 package org.apache.hadoop.mapreduce.v2.app.recover;
 
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.yarn.Clock;
@@ -32,7 +33,7 @@ public interface Recovery {
 
   Clock getClock();
   
-  Set<TaskId> getCompletedTasks();
+  Map<TaskId, TaskInfo> getCompletedTasks();
   
   List<AMInfo> getAMInfos();
 }

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Feb 14 00:08:13 2012
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -153,8 +153,8 @@ public class RecoveryService extends Com
   }
 
   @Override
-  public Set<TaskId> getCompletedTasks() {
-    return completedTasks.keySet();
+  public Map<TaskId, TaskInfo> getCompletedTasks() {
+    return completedTasks;
   }
 
   @Override
@@ -189,7 +189,8 @@ public class RecoveryService extends Com
         getConfig());
     //read the previous history file
     historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
-        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));          
+        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));  
+    LOG.info("History file is at " + historyFile);
     in = fc.open(historyFile);
     JobHistoryParser parser = new JobHistoryParser(in);
     jobInfo = parser.parse();
@@ -242,7 +243,7 @@ public class RecoveryService extends Com
         if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
           TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
               .getTaskAttemptID());
-          LOG.info("Attempt start time " + attInfo.getStartTime());
+          LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
           clock.setTime(attInfo.getStartTime());
 
         } else if (event.getType() == TaskAttemptEventType.TA_DONE
@@ -250,7 +251,7 @@ public class RecoveryService extends Com
             || event.getType() == TaskAttemptEventType.TA_KILL) {
           TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
               .getTaskAttemptID());
-          LOG.info("Attempt finish time " + attInfo.getFinishTime());
+          LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
           clock.setTime(attInfo.getFinishTime());
         }
 
@@ -380,17 +381,17 @@ public class RecoveryService extends Com
           }
           
           // send the done event
-          LOG.info("Sending done event to " + aId);
+          LOG.info("Sending done event to recovered attempt " + aId);
           actualHandler.handle(new TaskAttemptEvent(aId,
               TaskAttemptEventType.TA_DONE));
           break;
         case KILLED:
-          LOG.info("Sending kill event to " + aId);
+          LOG.info("Sending kill event to recovered attempt " + aId);
           actualHandler.handle(new TaskAttemptEvent(aId,
               TaskAttemptEventType.TA_KILL));
           break;
         default:
-          LOG.info("Sending fail event to " + aId);
+          LOG.info("Sending fail event to recovered attempt " + aId);
           actualHandler.handle(new TaskAttemptEvent(aId,
               TaskAttemptEventType.TA_FAILMSG));
           break;

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Feb 14 00:08:13 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
 import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -74,7 +76,14 @@ public class TestRecovery {
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
-
+  /**
+   * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
+   * completely disappears because of failed launch, one attempt gets killed and
+   * one attempt succeeds. AM crashes after the first tasks finishes and
+   * recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
   @Test
   public void testCrashed() throws Exception {
 
@@ -112,7 +121,8 @@ public class TestRecovery {
     // reduces must be in NEW state
     Assert.assertEquals("Reduce Task state not correct",
         TaskState.RUNNING, reduceTask.getReport().getTaskState());
-    
+
+    /////////// Play some games with the TaskAttempts of the first task //////
     //send the fail signal to the 1st map task attempt
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(
@@ -120,42 +130,68 @@ public class TestRecovery {
             TaskAttemptEventType.TA_FAILMSG));
     
     app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
-    
-    while (mapTask1.getAttempts().size() != 2) {
+
+    int timeOut = 0;
+    while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
       Thread.sleep(2000);
       LOG.info("Waiting for next attempt to start");
     }
+    Assert.assertEquals(2, mapTask1.getAttempts().size());
     Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
     itr.next();
     TaskAttempt task1Attempt2 = itr.next();
     
-    app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
+    // This attempt will automatically fail because of the way ContainerLauncher
+    // is setup
+    // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
+    app.getContext().getEventHandler().handle(
+      new TaskAttemptEvent(task1Attempt2.getID(),
+        TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+    app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
 
-    //send the kill signal to the 1st map 2nd attempt
+    timeOut = 0;
+    while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
+      Thread.sleep(2000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    Assert.assertEquals(3, mapTask1.getAttempts().size());
+    itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    itr.next();
+    TaskAttempt task1Attempt3 = itr.next();
+    
+    app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+
+    //send the kill signal to the 1st map 3rd attempt
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(
-            task1Attempt2.getID(),
+            task1Attempt3.getID(),
             TaskAttemptEventType.TA_KILL));
     
-    app.waitForState(task1Attempt2, TaskAttemptState.KILLED);
-    
-    while (mapTask1.getAttempts().size() != 3) {
+    app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
+
+    timeOut = 0;
+    while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
       Thread.sleep(2000);
       LOG.info("Waiting for next attempt to start");
     }
+    Assert.assertEquals(4, mapTask1.getAttempts().size());
     itr = mapTask1.getAttempts().values().iterator();
     itr.next();
     itr.next();
-    TaskAttempt task1Attempt3 = itr.next();
+    itr.next();
+    TaskAttempt task1Attempt4 = itr.next();
     
-    app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+    app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
 
-    //send the done signal to the 1st map 3rd attempt
+    //send the done signal to the 1st map 4th attempt
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(
-            task1Attempt3.getID(),
+            task1Attempt4.getID(),
             TaskAttemptEventType.TA_DONE));
 
+    /////////// End of games with the TaskAttempts of the first task //////
+
     //wait for first map task to complete
     app.waitForState(mapTask1, TaskState.SUCCEEDED);
     long task1StartTime = mapTask1.getReport().getStartTime();
@@ -552,7 +588,7 @@ public class TestRecovery {
   }
 
 
-  class MRAppWithHistory extends MRApp {
+  static class MRAppWithHistory extends MRApp {
     public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
         String testName, boolean cleanOnStart, int startCount) {
       super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
@@ -567,7 +603,17 @@ public class TestRecovery {
 
     @Override
     protected ContainerLauncher createContainerLauncher(AppContext context) {
-      MockContainerLauncher launcher = new MockContainerLauncher();
+      MockContainerLauncher launcher = new MockContainerLauncher() {
+        @Override
+        public void handle(ContainerLauncherEvent event) {
+          TaskAttemptId taskAttemptID = event.getTaskAttemptID();
+          // Pass everything except the 2nd attempt of the first task.
+          if (taskAttemptID.getId() != 1
+              || taskAttemptID.getTaskId().getId() != 0) {
+            super.handle(event);
+          }
+        }
+      };
       launcher.shufflePort = 5467;
       return launcher;
     }
@@ -581,7 +627,7 @@ public class TestRecovery {
     }
   }
 
-  class RecoveryServiceWithCustomDispatcher extends RecoveryService {
+  static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
 
     public RecoveryServiceWithCustomDispatcher(
         ApplicationAttemptId applicationAttemptId, Clock clock,

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Feb 14 00:08:13 2012
@@ -25,7 +25,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -72,7 +73,7 @@ public class TestTaskImpl {
   private Path remoteJobConfFile;
   private Collection<Token<? extends TokenIdentifier>> fsTokens;
   private Clock clock;
-  private Set<TaskId> completedTasksFromPreviousRun;
+  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private MRAppMetrics metrics;
   private TaskImpl mockTask;
   private ApplicationId appId;
@@ -96,7 +97,7 @@ public class TestTaskImpl {
         TaskAttemptListener taskAttemptListener, OutputCommitter committer,
         Token<JobTokenIdentifier> jobToken,
         Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
-        Set<TaskId> completedTasksFromPreviousRun, int startCount,
+        Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
         MRAppMetrics metrics) {
       super(jobId, taskType , partition, eventHandler,
           remoteJobConfFile, conf, taskAttemptListener, committer, 

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1243753&r1=1243752&r2=1243753&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Tue Feb 14 00:08:13 2012
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
-@SuppressWarnings("deprecation")
 public class TypeConverter {
 
   private static RecordFactory recordFactory;
@@ -116,8 +115,8 @@ public class TypeConverter {
   }
 
   public static org.apache.hadoop.mapred.TaskID fromYarn(TaskId id) {
-    return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), fromYarn(id.getTaskType()),
-        id.getId());
+    return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()),
+      fromYarn(id.getTaskType()), id.getId());
   }
 
   public static TaskId toYarn(org.apache.hadoop.mapreduce.TaskID id) {