You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2013/04/11 07:02:46 UTC

svn commit: r1466770 [2/2] - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Thu Apr 11 05:02:45 2013
@@ -18,10 +18,21 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.Event;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRecovery {
@@ -75,6 +116,7 @@ public class TestRecovery {
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
+
   /**
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * completely disappears because of failed launch, one attempt gets killed and
@@ -1011,6 +1053,423 @@ public class TestRecovery {
     app.verifyCompleted();
   }
 
+  @Test
+  public void testRecoverySuccessAttempt() {
+    LOG.info("--- START: testRecoverySuccessAttempt ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.TASK_FINISHED);
+    recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 1L);
+  }
+
+  @Test
+  public void testRecoveryAllFailAttempts() {
+    LOG.info("--- START: testRecoveryAllFailAttempts ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.TASK_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 2L);
+  }
+
+  @Test
+  public void testRecoveryTaskSuccessAllAttemptsFail() {
+    LOG.info("--- START:  testRecoveryTaskSuccessAllAttemptsFail ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.FAILED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+    finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+    // check for one new attempt launched since successful attempt not found
+    TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000);
+    finalAttemptStates.put(taId3, TaskAttemptState.NEW);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 2L);
+  }
+
+  @Test
+  public void testRecoveryTaskSuccessAllAttemptsSucceed() {
+    LOG.info("--- START:  testRecoveryTaskSuccessAllAttemptsFail ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.SUCCEEDED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+    finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+    jobHistoryEvents.add(EventType.TASK_FINISHED);
+    recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 0L);
+  }
+
+  @Test
+  public void testRecoveryAllAttemptsKilled() {
+    LOG.info("--- START:  testRecoveryAllAttemptsKilled ---");
+
+    long clusterTimestamp = System.currentTimeMillis();
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+        mockEventHandler);
+
+    TaskId taskId = recoverMapTask.getID();
+    JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+    TaskID taskID = new TaskID(jobID,
+        org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+    //Mock up the TaskAttempts
+    Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+        new HashMap<TaskAttemptID, TaskAttemptInfo>();
+    TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+    TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+        TaskAttemptState.KILLED);
+    mockTaskAttempts.put(taId1, mockTAinfo1);
+
+    TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+    TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+        TaskAttemptState.KILLED);
+    mockTaskAttempts.put(taId2, mockTAinfo2);
+
+    OutputCommitter mockCommitter = mock (OutputCommitter.class);
+    TaskInfo mockTaskInfo = mock(TaskInfo.class);
+    when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED");
+    when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+    when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+    recoverMapTask.handle(
+        new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler,atLeast(1)).handle(
+        (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+    Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+        new HashMap<TaskAttemptID, TaskAttemptState>();
+    finalAttemptStates.put(taId1, TaskAttemptState.KILLED);
+    finalAttemptStates.put(taId2, TaskAttemptState.KILLED);
+
+    List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+    jobHistoryEvents.add(EventType.TASK_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+    jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+    jobHistoryEvents.add(EventType.TASK_FAILED);
+    recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates,
+        arg, jobHistoryEvents, 2L, 0L);
+  }
+
+  private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState,
+      Map<TaskAttemptID, TaskAttemptState> finalAttemptStates,
+      ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents,
+      long expectedMapLaunches, long expectedFailedMaps) {
+
+    assertEquals("Final State of Task", finalState, checkTask.getState());
+
+    Map<TaskAttemptId, TaskAttempt> recoveredAttempts =
+        checkTask.getAttempts();
+    assertEquals("Expected Number of Task Attempts",
+        finalAttemptStates.size(), recoveredAttempts.size());
+    for (TaskAttemptID taID : finalAttemptStates.keySet()) {
+      assertEquals("Expected Task Attempt State",
+          finalAttemptStates.get(taID),
+          recoveredAttempts.get(TypeConverter.toYarn(taID)).getState());
+    }
+
+    Iterator<Event> ie = arg.getAllValues().iterator();
+    int eventNum = 0;
+    long totalLaunchedMaps = 0;
+    long totalFailedMaps = 0;
+    boolean jobTaskEventReceived = false;
+
+    while (ie.hasNext()) {
+      Object current = ie.next();
+      ++eventNum;
+      LOG.info(eventNum + " " + current.getClass().getName());
+      if (current instanceof JobHistoryEvent) {
+        JobHistoryEvent jhe = (JobHistoryEvent) current;
+        LOG.info(expectedJobHistoryEvents.get(0).toString() + " " +
+            jhe.getHistoryEvent().getEventType().toString() + " " +
+            jhe.getJobID());
+        assertEquals(expectedJobHistoryEvents.get(0),
+            jhe.getHistoryEvent().getEventType());
+        expectedJobHistoryEvents.remove(0);
+      }  else if (current instanceof JobCounterUpdateEvent) {
+        JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
+
+        LOG.info("JobCounterUpdateEvent "
+            + jcue.getCounterUpdates().get(0).getCounterKey()
+            + " " + jcue.getCounterUpdates().get(0).getIncrementValue());
+        if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+            JobCounter.NUM_FAILED_MAPS) {
+          totalFailedMaps += jcue.getCounterUpdates().get(0)
+              .getIncrementValue();
+        } else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+            JobCounter.TOTAL_LAUNCHED_MAPS) {
+          totalLaunchedMaps += jcue.getCounterUpdates().get(0)
+              .getIncrementValue();
+        }
+      } else if (current instanceof JobTaskEvent) {
+        JobTaskEvent jte = (JobTaskEvent) current;
+        assertEquals(jte.getState(), finalState);
+        jobTaskEventReceived = true;
+      }
+    }
+    assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING));
+    assertEquals("Did not process all expected JobHistoryEvents",
+        0, expectedJobHistoryEvents.size());
+    assertEquals("Expected Map Launches",
+        expectedMapLaunches, totalLaunchedMaps);
+    assertEquals("Expected Failed Maps",
+        expectedFailedMaps, totalFailedMaps);
+  }
+
+  private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
+
+    ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+
+    int partitions = 2;
+
+    Path remoteJobConfFile = mock(Path.class);
+    JobConf conf = new JobConf();
+    TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
+    Token<JobTokenIdentifier> jobToken =
+        (Token<JobTokenIdentifier>) mock(Token.class);
+    Credentials credentials = null;
+    Clock clock = new SystemClock();
+    int appAttemptId = 3;
+    MRAppMetrics metrics = mock(MRAppMetrics.class);
+    Resource minContainerRequirements = mock(Resource.class);
+    when(minContainerRequirements.getMemory()).thenReturn(1000);
+
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(
+        minContainerRequirements);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getClusterInfo()).thenReturn(clusterInfo);
+
+    TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
+    MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
+        eh, remoteJobConfFile, conf,
+        taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
+        appAttemptId, metrics, appContext);
+    return mapTask;
+  }
+
+  private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
+      TaskAttemptState tas) {
+
+    ContainerId ci = mock(ContainerId.class);
+    Counters counters = mock(Counters.class);
+    TaskType tt = TaskType.MAP;
+
+    long finishTime = System.currentTimeMillis();
+
+    TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
+
+    when(mockTAinfo.getAttemptId()).thenReturn(tai);
+    when(mockTAinfo.getContainerId()).thenReturn(ci);
+    when(mockTAinfo.getCounters()).thenReturn(counters);
+    when(mockTAinfo.getError()).thenReturn("");
+    when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
+    when(mockTAinfo.getHostname()).thenReturn("localhost");
+    when(mockTAinfo.getHttpPort()).thenReturn(23);
+    when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
+    when(mockTAinfo.getPort()).thenReturn(24);
+    when(mockTAinfo.getRackname()).thenReturn("defaultRack");
+    when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
+    when(mockTAinfo.getShufflePort()).thenReturn(25);
+    when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
+    when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
+    when(mockTAinfo.getState()).thenReturn("task in progress");
+    when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
+    when(mockTAinfo.getTaskType()).thenReturn(tt);
+    when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
+    return mockTAinfo;
+  }
+
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
@@ -1145,5 +1604,16 @@ public class TestRecovery {
   public static void main(String[] arg) throws Exception {
     TestRecovery test = new TestRecovery();
     test.testCrashed();
+    test.testMultipleCrashes();
+    test.testOutputRecovery();
+    test.testOutputRecoveryMapsOnly();
+    test.testRecoveryWithOldCommiter();
+    test.testSpeculative();
+    test.testRecoveryWithoutShuffleSecret();
+    test.testRecoverySuccessAttempt();
+    test.testRecoveryAllFailAttempts();
+    test.testRecoveryTaskSuccessAllAttemptsFail();
+    test.testRecoveryTaskSuccessAllAttemptsSucceed();
+    test.testRecoveryAllAttemptsKilled();
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Apr 11 05:02:45 2013
@@ -316,7 +316,8 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          isNewApiCommitter(), currentUser.getUserName(), getContext(),
+          getCommitter(), isNewApiCommitter(),
+          currentUser.getUserName(), getContext(),
           forcedState, diagnostic);
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Thu Apr 11 05:02:45 2013
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
@@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.JobAC
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus.State;
@@ -47,6 +49,7 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -57,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
@@ -69,7 +73,6 @@ import org.apache.hadoop.yarn.SystemCloc
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -133,7 +136,7 @@ public class TestJobImpl {
     JobImpl job = createStubbedJob(conf, dispatcher, 0);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    job.handle(new JobStartEvent(job.getID()));
     assertJobState(job, JobStateInternal.SUCCEEDED);
     dispatcher.stop();
     commitHandler.stop();
@@ -222,7 +225,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.SETUP);
 
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
@@ -284,7 +287,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.SETUP);
 
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
@@ -351,7 +354,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.FAIL_ABORT);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -388,7 +391,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.SETUP);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -428,7 +431,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -439,7 +442,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -450,7 +453,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -461,7 +464,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -472,7 +475,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -490,7 +493,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -501,7 +504,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -556,7 +559,7 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, new JobTokenSecretManager(), new Credentials(), null, null,
-        mrAppMetrics, true, null, 0, null, null, null, null);
+        mrAppMetrics, null, true, null, 0, null, null, null, null);
     InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
@@ -597,7 +600,7 @@ public class TestJobImpl {
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    job.handle(new JobStartEvent(jobId));
     assertJobState(job, JobStateInternal.FAILED);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
@@ -661,7 +664,7 @@ public class TestJobImpl {
     StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
-    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    job.handle(new JobStartEvent(job.getID()));
     assertJobState(job, JobStateInternal.RUNNING);
     return job;
   }
@@ -785,9 +788,9 @@ public class TestJobImpl {
         boolean newApiCommitter, String user, int numSplits) {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
-          new SystemClock(), null, MRAppMetrics.create(),
-          newApiCommitter, user, System.currentTimeMillis(), null, null, null,
-          null);
+          new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
+          MRAppMetrics.create(), null, newApiCommitter, user,
+          System.currentTimeMillis(), null, null, null, null);
 
       initTransition = getInitTransition(numSplits);
       localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Thu Apr 11 05:02:45 2013
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbi
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
@@ -80,7 +78,6 @@ public class TestTaskImpl {
   private Path remoteJobConfFile;
   private Credentials credentials;
   private Clock clock;
-  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private MRAppMetrics metrics;
   private TaskImpl mockTask;
   private ApplicationId appId;
@@ -104,13 +101,12 @@ public class TestTaskImpl {
         EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
         TaskAttemptListener taskAttemptListener,
         Token<JobTokenIdentifier> jobToken,
-        Credentials credentials, Clock clock,
-        Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
+        Credentials credentials, Clock clock, int startCount,
         MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
       super(jobId, taskType , partition, eventHandler,
           remoteJobConfFile, conf, taskAttemptListener,
           jobToken, credentials, clock,
-          completedTasksFromPreviousRun, startCount, metrics, appContext);
+          startCount, metrics, appContext);
       this.taskType = taskType;
     }
 
@@ -247,8 +243,7 @@ public class TestTaskImpl {
     return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
         credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext, taskType);
+        startCount, metrics, appContext, taskType);
   }
 
   @After 
@@ -652,9 +647,7 @@ public class TestTaskImpl {
   public void testFailedTransitions() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
-        credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext, TaskType.MAP) {
+        credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
           @Override
           protected int getMaxAttempts() {
             return 1;
@@ -721,9 +714,7 @@ public class TestTaskImpl {
   public void testCountersWithSpeculation() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, jobToken,
-        credentials, clock,
-        completedTasksFromPreviousRun, startCount,
-        metrics, appContext, TaskType.MAP) {
+        credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
           @Override
           protected int getMaxAttempts() {
             return 1;

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Thu Apr 11 05:02:45 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import com.google.common.base.Joiner;
@@ -525,4 +526,19 @@ public class JobHistoryUtils {
     sb.append(jobId.toString());
     return sb.toString();
   }
+
+  public static Path getPreviousJobHistoryPath(
+      Configuration conf, ApplicationAttemptId applicationAttemptId)
+      throws IOException {
+    String jobId =
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
+          .toString();
+    String jobhistoryDir =
+        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
+    Path histDirPath = FileContext.getFileContext(conf).makeQualified(
+            new Path(jobhistoryDir));
+    FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
+    return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
+        histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Apr 11 05:02:45 2013
@@ -422,6 +422,7 @@ public interface MRJobConfig {
   /** Enable job recovery.*/
   public static final String MR_AM_JOB_RECOVERY_ENABLE = 
     MR_AM_PREFIX + "job.recovery.enable";
+  public static final boolean MR_AM_JOB_RECOVERY_ENABLE_DEFAULT = true;
 
   /** 
    * Limit on the number of reducers that can be preempted to ensure that at