You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2013/04/11 07:02:46 UTC
svn commit: r1466770 [2/2] - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Thu Apr 11 05:02:45 2013
@@ -18,10 +18,21 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import junit.framework.Assert;
@@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.Event;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRecovery {
@@ -75,6 +116,7 @@ public class TestRecovery {
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
+
/**
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
* completely disappears because of failed launch, one attempt gets killed and
@@ -1011,6 +1053,423 @@ public class TestRecovery {
app.verifyCompleted();
}
+ @Test
+ public void testRecoverySuccessAttempt() {
+ LOG.info("--- START: testRecoverySuccessAttempt ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.SUCCEEDED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+ finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ jobHistoryEvents.add(EventType.TASK_FINISHED);
+ recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 1L);
+ }
+
+ @Test
+ public void testRecoveryAllFailAttempts() {
+ LOG.info("--- START: testRecoveryAllFailAttempts ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+ finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ jobHistoryEvents.add(EventType.TASK_FAILED);
+ recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 2L);
+ }
+
+ @Test
+ public void testRecoveryTaskSuccessAllAttemptsFail() {
+ LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.FAILED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
+ finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
+ // check for one new attempt launched since successful attempt not found
+ TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000);
+ finalAttemptStates.put(taId3, TaskAttemptState.NEW);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
+ recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 2L);
+ }
+
+ @Test
+ public void testRecoveryTaskSuccessAllAttemptsSucceed() {
+ LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.SUCCEEDED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.SUCCEEDED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
+ finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
+ jobHistoryEvents.add(EventType.TASK_FINISHED);
+ recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 0L);
+ }
+
+ @Test
+ public void testRecoveryAllAttemptsKilled() {
+ LOG.info("--- START: testRecoveryAllAttemptsKilled ---");
+
+ long clusterTimestamp = System.currentTimeMillis();
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
+ mockEventHandler);
+
+ TaskId taskId = recoverMapTask.getID();
+ JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
+ TaskID taskID = new TaskID(jobID,
+ org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
+
+ //Mock up the TaskAttempts
+ Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
+ new HashMap<TaskAttemptID, TaskAttemptInfo>();
+ TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
+ TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
+ TaskAttemptState.KILLED);
+ mockTaskAttempts.put(taId1, mockTAinfo1);
+
+ TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
+ TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
+ TaskAttemptState.KILLED);
+ mockTaskAttempts.put(taId2, mockTAinfo2);
+
+ OutputCommitter mockCommitter = mock (OutputCommitter.class);
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED");
+ when(mockTaskInfo.getTaskId()).thenReturn(taskID);
+ when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
+
+ recoverMapTask.handle(
+ new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler,atLeast(1)).handle(
+ (org.apache.hadoop.yarn.event.Event) arg.capture());
+
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
+ new HashMap<TaskAttemptID, TaskAttemptState>();
+ finalAttemptStates.put(taId1, TaskAttemptState.KILLED);
+ finalAttemptStates.put(taId2, TaskAttemptState.KILLED);
+
+ List<EventType> jobHistoryEvents = new ArrayList<EventType>();
+ jobHistoryEvents.add(EventType.TASK_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
+ jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
+ jobHistoryEvents.add(EventType.TASK_FAILED);
+ recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates,
+ arg, jobHistoryEvents, 2L, 0L);
+ }
+
+ private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState,
+ Map<TaskAttemptID, TaskAttemptState> finalAttemptStates,
+ ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents,
+ long expectedMapLaunches, long expectedFailedMaps) {
+
+ assertEquals("Final State of Task", finalState, checkTask.getState());
+
+ Map<TaskAttemptId, TaskAttempt> recoveredAttempts =
+ checkTask.getAttempts();
+ assertEquals("Expected Number of Task Attempts",
+ finalAttemptStates.size(), recoveredAttempts.size());
+ for (TaskAttemptID taID : finalAttemptStates.keySet()) {
+ assertEquals("Expected Task Attempt State",
+ finalAttemptStates.get(taID),
+ recoveredAttempts.get(TypeConverter.toYarn(taID)).getState());
+ }
+
+ Iterator<Event> ie = arg.getAllValues().iterator();
+ int eventNum = 0;
+ long totalLaunchedMaps = 0;
+ long totalFailedMaps = 0;
+ boolean jobTaskEventReceived = false;
+
+ while (ie.hasNext()) {
+ Object current = ie.next();
+ ++eventNum;
+ LOG.info(eventNum + " " + current.getClass().getName());
+ if (current instanceof JobHistoryEvent) {
+ JobHistoryEvent jhe = (JobHistoryEvent) current;
+ LOG.info(expectedJobHistoryEvents.get(0).toString() + " " +
+ jhe.getHistoryEvent().getEventType().toString() + " " +
+ jhe.getJobID());
+ assertEquals(expectedJobHistoryEvents.get(0),
+ jhe.getHistoryEvent().getEventType());
+ expectedJobHistoryEvents.remove(0);
+ } else if (current instanceof JobCounterUpdateEvent) {
+ JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
+
+ LOG.info("JobCounterUpdateEvent "
+ + jcue.getCounterUpdates().get(0).getCounterKey()
+ + " " + jcue.getCounterUpdates().get(0).getIncrementValue());
+ if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+ JobCounter.NUM_FAILED_MAPS) {
+ totalFailedMaps += jcue.getCounterUpdates().get(0)
+ .getIncrementValue();
+ } else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+ JobCounter.TOTAL_LAUNCHED_MAPS) {
+ totalLaunchedMaps += jcue.getCounterUpdates().get(0)
+ .getIncrementValue();
+ }
+ } else if (current instanceof JobTaskEvent) {
+ JobTaskEvent jte = (JobTaskEvent) current;
+ assertEquals(jte.getState(), finalState);
+ jobTaskEventReceived = true;
+ }
+ }
+ assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING));
+ assertEquals("Did not process all expected JobHistoryEvents",
+ 0, expectedJobHistoryEvents.size());
+ assertEquals("Expected Map Launches",
+ expectedMapLaunches, totalLaunchedMaps);
+ assertEquals("Expected Failed Maps",
+ expectedFailedMaps, totalFailedMaps);
+ }
+
+ private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
+
+ ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+
+ int partitions = 2;
+
+ Path remoteJobConfFile = mock(Path.class);
+ JobConf conf = new JobConf();
+ TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
+ Token<JobTokenIdentifier> jobToken =
+ (Token<JobTokenIdentifier>) mock(Token.class);
+ Credentials credentials = null;
+ Clock clock = new SystemClock();
+ int appAttemptId = 3;
+ MRAppMetrics metrics = mock(MRAppMetrics.class);
+ Resource minContainerRequirements = mock(Resource.class);
+ when(minContainerRequirements.getMemory()).thenReturn(1000);
+
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(
+ minContainerRequirements);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getClusterInfo()).thenReturn(clusterInfo);
+
+ TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
+ MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
+ eh, remoteJobConfFile, conf,
+ taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
+ appAttemptId, metrics, appContext);
+ return mapTask;
+ }
+
+ private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
+ TaskAttemptState tas) {
+
+ ContainerId ci = mock(ContainerId.class);
+ Counters counters = mock(Counters.class);
+ TaskType tt = TaskType.MAP;
+
+ long finishTime = System.currentTimeMillis();
+
+ TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
+
+ when(mockTAinfo.getAttemptId()).thenReturn(tai);
+ when(mockTAinfo.getContainerId()).thenReturn(ci);
+ when(mockTAinfo.getCounters()).thenReturn(counters);
+ when(mockTAinfo.getError()).thenReturn("");
+ when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
+ when(mockTAinfo.getHostname()).thenReturn("localhost");
+ when(mockTAinfo.getHttpPort()).thenReturn(23);
+ when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
+ when(mockTAinfo.getPort()).thenReturn(24);
+ when(mockTAinfo.getRackname()).thenReturn("defaultRack");
+ when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
+ when(mockTAinfo.getShufflePort()).thenReturn(25);
+ when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
+ when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
+ when(mockTAinfo.getState()).thenReturn("task in progress");
+ when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
+ when(mockTAinfo.getTaskType()).thenReturn(tt);
+ when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
+ return mockTAinfo;
+ }
+
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
@@ -1145,5 +1604,16 @@ public class TestRecovery {
public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery();
test.testCrashed();
+ test.testMultipleCrashes();
+ test.testOutputRecovery();
+ test.testOutputRecoveryMapsOnly();
+ test.testRecoveryWithOldCommiter();
+ test.testSpeculative();
+ test.testRecoveryWithoutShuffleSecret();
+ test.testRecoverySuccessAttempt();
+ test.testRecoveryAllFailAttempts();
+ test.testRecoveryTaskSuccessAllAttemptsFail();
+ test.testRecoveryTaskSuccessAllAttemptsSucceed();
+ test.testRecoveryAllAttemptsKilled();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Apr 11 05:02:45 2013
@@ -316,7 +316,8 @@ import org.junit.Test;
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- isNewApiCommitter(), currentUser.getUserName(), getContext(),
+ getCommitter(), isNewApiCommitter(),
+ currentUser.getUserName(), getContext(),
forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Thu Apr 11 05:02:45 2013
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
@@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.JobAC
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus.State;
@@ -47,6 +49,7 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -57,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
@@ -69,7 +73,6 @@ import org.apache.hadoop.yarn.SystemCloc
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -133,7 +136,7 @@ public class TestJobImpl {
JobImpl job = createStubbedJob(conf, dispatcher, 0);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+ job.handle(new JobStartEvent(job.getID()));
assertJobState(job, JobStateInternal.SUCCEEDED);
dispatcher.stop();
commitHandler.stop();
@@ -222,7 +225,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
@@ -284,7 +287,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
@@ -351,7 +354,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.FAIL_ABORT);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -388,7 +391,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@@ -428,7 +431,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -439,7 +442,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -450,7 +453,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -461,7 +464,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -472,7 +475,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
- null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@@ -490,7 +493,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, true, null, 0, null, null, null, null);
+ mrAppMetrics, null, true, null, 0, null, null, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
@@ -501,7 +504,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, true, null, 0, null, null, null, null);
+ mrAppMetrics, null, true, null, 0, null, null, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
@@ -556,7 +559,7 @@ public class TestJobImpl {
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, new JobTokenSecretManager(), new Credentials(), null, null,
- mrAppMetrics, true, null, 0, null, null, null, null);
+ mrAppMetrics, null, true, null, 0, null, null, null, null);
InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent);
@@ -597,7 +600,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
@@ -661,7 +664,7 @@ public class TestJobImpl {
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
- job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+ job.handle(new JobStartEvent(job.getID()));
assertJobState(job, JobStateInternal.RUNNING);
return job;
}
@@ -785,9 +788,9 @@ public class TestJobImpl {
boolean newApiCommitter, String user, int numSplits) {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
- new SystemClock(), null, MRAppMetrics.create(),
- newApiCommitter, user, System.currentTimeMillis(), null, null, null,
- null);
+ new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
+ MRAppMetrics.create(), null, newApiCommitter, user,
+ System.currentTimeMillis(), null, null, null, null);
initTransition = getInitTransition(numSplits);
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Thu Apr 11 05:02:45 2013
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbi
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
@@ -80,7 +78,6 @@ public class TestTaskImpl {
private Path remoteJobConfFile;
private Credentials credentials;
private Clock clock;
- private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private MRAppMetrics metrics;
private TaskImpl mockTask;
private ApplicationId appId;
@@ -104,13 +101,12 @@ public class TestTaskImpl {
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
- Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
+ Credentials credentials, Clock clock, int startCount,
MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener,
jobToken, credentials, clock,
- completedTasksFromPreviousRun, startCount, metrics, appContext);
+ startCount, metrics, appContext);
this.taskType = taskType;
}
@@ -247,8 +243,7 @@ public class TestTaskImpl {
return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
- completedTasksFromPreviousRun, startCount,
- metrics, appContext, taskType);
+ startCount, metrics, appContext, taskType);
}
@After
@@ -652,9 +647,7 @@ public class TestTaskImpl {
public void testFailedTransitions() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
- credentials, clock,
- completedTasksFromPreviousRun, startCount,
- metrics, appContext, TaskType.MAP) {
+ credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
@@ -721,9 +714,7 @@ public class TestTaskImpl {
public void testCountersWithSpeculation() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
- credentials, clock,
- completedTasksFromPreviousRun, startCount,
- metrics, appContext, TaskType.MAP) {
+ credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Thu Apr 11 05:02:45 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.google.common.base.Joiner;
@@ -525,4 +526,19 @@ public class JobHistoryUtils {
sb.append(jobId.toString());
return sb.toString();
}
+
+ public static Path getPreviousJobHistoryPath(
+ Configuration conf, ApplicationAttemptId applicationAttemptId)
+ throws IOException {
+ String jobId =
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
+ .toString();
+ String jobhistoryDir =
+ JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
+ Path histDirPath = FileContext.getFileContext(conf).makeQualified(
+ new Path(jobhistoryDir));
+ FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
+ return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
+ histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Apr 11 05:02:45 2013
@@ -422,6 +422,7 @@ public interface MRJobConfig {
/** Enable job recovery.*/
public static final String MR_AM_JOB_RECOVERY_ENABLE =
MR_AM_PREFIX + "job.recovery.enable";
+ public static final boolean MR_AM_JOB_RECOVERY_ENABLE_DEFAULT = true;
/**
* Limit on the number of reducers that can be preempted to ensure that at