You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2013/04/11 07:02:46 UTC
svn commit: r1466770 [1/2] - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Author: sseth
Date: Thu Apr 11 05:02:45 2013
New Revision: 1466770
URL: http://svn.apache.org/r1466770
Log:
merge MAPREDUCE-5079 from trunk. Changes job recovery to restore state directly from job history, instaed of simulating state machine events. Contributed by Jason Lowe and Robert Parker.
Added:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
- copied unchanged from r1466767, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java
- copied unchanged from r1466767, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java
- copied unchanged from r1466767, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java
Removed:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Apr 11 05:02:45 2013
@@ -23,6 +23,10 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
faster job submission. (amarrk via tgraves)
+ MAPREDUCE-5079. Changes job recovery to restore state directly from job
+ history, instaed of simulating state machine events.
+ (Jason Lowe and Robert Parker via sseth)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Apr 11 05:02:45 2013
@@ -24,9 +24,12 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
@@ -46,6 +49,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
@@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -61,6 +68,7 @@ import org.apache.hadoop.mapreduce.task.
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
@@ -74,6 +82,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -84,8 +93,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
-import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
@@ -94,6 +101,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -167,7 +175,6 @@ public class MRAppMaster extends Composi
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
- private Recovery recoveryServ;
private ContainerAllocator containerAllocator;
private ContainerLauncher containerLauncher;
private EventHandler<CommitterEvent> committerEventHandler;
@@ -180,7 +187,6 @@ public class MRAppMaster extends Composi
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
- private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private Job job;
@@ -193,6 +199,8 @@ public class MRAppMaster extends Composi
private String shutDownMessage = null;
JobStateInternal forcedState = null;
+ private long recoveredJobStartTime = 0;
+
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime, int maxAppAttempts) {
@@ -340,34 +348,9 @@ public class MRAppMaster extends Composi
}
} else {
committer = createOutputCommitter(conf);
- boolean recoveryEnabled = conf.getBoolean(
- MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
- boolean recoverySupportedByCommitter = committer.isRecoverySupported();
-
- // If a shuffle secret was not provided by the job client then this app
- // attempt will generate one. However that disables recovery if there
- // are reducers as the shuffle secret would be app attempt specific.
- boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
- TokenCache.getShuffleSecretKey(fsTokens) != null);
-
- if (recoveryEnabled && recoverySupportedByCommitter
- && shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) {
- LOG.info("Recovery is enabled. "
- + "Will try to recover from previous life on best effort basis.");
- recoveryServ = createRecoveryService(context);
- addIfService(recoveryServ);
- dispatcher = recoveryServ.getDispatcher();
- clock = recoveryServ.getClock();
- inRecovery = true;
- } else {
- LOG.info("Not starting RecoveryService: recoveryEnabled: "
- + recoveryEnabled + " recoverySupportedByCommitter: "
- + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
- + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
- + appAttemptID.getAttemptId());
- dispatcher = createDispatcher();
- addIfService(dispatcher);
- }
+
+ dispatcher = createDispatcher();
+ addIfService(dispatcher);
//service to handle requests from JobClient
clientService = createClientService(context);
@@ -595,15 +578,6 @@ public class MRAppMaster extends Composi
return new JobFinishEventHandler();
}
- /**
- * Create the recovery service.
- * @return an instance of the recovery service.
- */
- protected Recovery createRecoveryService(AppContext appContext) {
- return new RecoveryService(appContext.getApplicationAttemptId(),
- appContext.getClock(), getCommitter(), isNewApiCommitter());
- }
-
/** Create and initialize (but don't start) a single job.
* @param forcedState a state to force the job into or null for normal operation.
* @param diagnostic a diagnostic message to include with the job.
@@ -615,7 +589,8 @@ public class MRAppMaster extends Composi
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
- completedTasksFromPreviousRun, metrics, newApiCommitter,
+ completedTasksFromPreviousRun, metrics,
+ committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@@ -978,18 +953,8 @@ public class MRAppMaster extends Composi
public void start() {
amInfos = new LinkedList<AMInfo>();
-
- // Pull completedTasks etc from recovery
- if (inRecovery) {
- completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
- amInfos = recoveryServ.getAMInfos();
- } else {
- // Get the amInfos anyways irrespective of whether recovery is enabled or
- // not IF this is not the first AM generation
- if (appAttemptID.getAttemptId() != 1) {
- amInfos.addAll(readJustAMInfos());
- }
- }
+ completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
+ processRecovery();
// Current an AMInfo for the current AM generation.
AMInfo amInfo =
@@ -1051,13 +1016,105 @@ public class MRAppMaster extends Composi
startJobs();
}
+ private void processRecovery() {
+ if (appAttemptID.getAttemptId() == 1) {
+ return; // no need to recover on the first attempt
+ }
+
+ boolean recoveryEnabled = getConfig().getBoolean(
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
+ boolean recoverySupportedByCommitter =
+ committer != null && committer.isRecoverySupported();
+
+ // If a shuffle secret was not provided by the job client then this app
+ // attempt will generate one. However that disables recovery if there
+ // are reducers as the shuffle secret would be app attempt specific.
+ int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
+ boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
+ TokenCache.getShuffleSecretKey(fsTokens) != null);
+
+ if (recoveryEnabled && recoverySupportedByCommitter
+ && shuffleKeyValidForRecovery) {
+ LOG.info("Recovery is enabled. "
+ + "Will try to recover from previous life on best effort basis.");
+ try {
+ parsePreviousJobHistory();
+ } catch (IOException e) {
+ LOG.warn("Unable to parse prior job history, aborting recovery", e);
+ // try to get just the AMInfos
+ amInfos.addAll(readJustAMInfos());
+ }
+ } else {
+ LOG.info("Will not try to recover. recoveryEnabled: "
+ + recoveryEnabled + " recoverySupportedByCommitter: "
+ + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+ + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+ + appAttemptID.getAttemptId());
+ // Get the amInfos anyways whether recovery is enabled or not
+ amInfos.addAll(readJustAMInfos());
+ }
+ }
+
+ private static FSDataInputStream getPreviousJobHistoryStream(
+ Configuration conf, ApplicationAttemptId appAttemptId)
+ throws IOException {
+ Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf,
+ appAttemptId);
+ LOG.info("Previous history file is at " + historyFile);
+ return historyFile.getFileSystem(conf).open(historyFile);
+ }
+
+ private void parsePreviousJobHistory() throws IOException {
+ FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
+ appAttemptID);
+ JobHistoryParser parser = new JobHistoryParser(in);
+ JobInfo jobInfo = parser.parse();
+ Exception parseException = parser.getParseException();
+ if (parseException != null) {
+ LOG.info("Got an error parsing job-history file" +
+ ", ignoring incomplete events.", parseException);
+ }
+ Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+ .getAllTasks();
+ for (TaskInfo taskInfo : taskInfos.values()) {
+ if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+ Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
+ taskInfo.getAllTaskAttempts().entrySet().iterator();
+ while (taskAttemptIterator.hasNext()) {
+ Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
+ if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
+ taskAttemptIterator.remove();
+ }
+ }
+ completedTasksFromPreviousRun
+ .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+ LOG.info("Read from history task "
+ + TypeConverter.toYarn(taskInfo.getTaskId()));
+ }
+ }
+ LOG.info("Read completed tasks from history "
+ + completedTasksFromPreviousRun.size());
+ recoveredJobStartTime = jobInfo.getLaunchTime();
+
+ // recover AMInfos
+ List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
+ if (jhAmInfoList != null) {
+ for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
+ AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+ jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+ jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+ jhAmInfo.getNodeManagerHttpPort());
+ amInfos.add(amInfo);
+ }
+ }
+ }
+
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
- inputStream =
- RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
- appAttemptID);
+ inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
@@ -1108,7 +1165,8 @@ public class MRAppMaster extends Composi
@SuppressWarnings("unchecked")
protected void startJobs() {
/** create a job-start event to get this ball rolling */
- JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
+ JobEvent startJobEvent = new JobStartEvent(job.getID(),
+ recoveredJobStartTime);
/** send the job-start event. this triggers the job execution. */
dispatcher.getEventHandler().handle(startJobEvent);
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java Thu Apr 11 05:02:45 2013
@@ -26,6 +26,7 @@ public enum TaskAttemptEventType {
//Producer:Task
TA_SCHEDULE,
TA_RESCHEDULE,
+ TA_RECOVER,
//Producer:Client, Task
TA_KILL,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java Thu Apr 11 05:02:45 2013
@@ -28,6 +28,7 @@ public enum TaskEventType {
//Producer:Job
T_SCHEDULE,
+ T_RECOVER,
//Producer:Speculator
T_ADD_SPEC_ATTEMPT,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Apr 11 05:02:45 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -92,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@@ -101,6 +103,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -159,6 +162,7 @@ public class JobImpl implements org.apac
private final Lock writeLock;
private final JobId jobId;
private final String jobName;
+ private final OutputCommitter committer;
private final boolean newApiCommitter;
private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
@@ -602,7 +606,7 @@ public class JobImpl implements org.apac
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
- boolean newApiCommitter, String userName,
+ OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
JobStateInternal forcedState, String forcedDiagnostic) {
this.applicationAttemptId = applicationAttemptId;
@@ -618,6 +622,7 @@ public class JobImpl implements org.apac
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
this.appSubmitTime = appSubmitTime;
this.oldJobId = TypeConverter.fromYarn(jobId);
+ this.committer = committer;
this.newApiCommitter = newApiCommitter;
this.taskAttemptListener = taskAttemptListener;
@@ -888,10 +893,16 @@ public class JobImpl implements org.apac
}
}
- protected void scheduleTasks(Set<TaskId> taskIDs) {
+ protected void scheduleTasks(Set<TaskId> taskIDs,
+ boolean recoverTaskOutput) {
for (TaskId taskID : taskIDs) {
- eventHandler.handle(new TaskEvent(taskID,
- TaskEventType.T_SCHEDULE));
+ TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
+ if (taskInfo != null) {
+ eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
+ committer, recoverTaskOutput));
+ } else {
+ eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
+ }
}
}
@@ -1421,7 +1432,7 @@ public class JobImpl implements org.apac
job.conf, splits[i],
job.taskAttemptListener,
job.jobToken, job.fsTokens,
- job.clock, job.completedTasksFromPreviousRun,
+ job.clock,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
@@ -1439,7 +1450,6 @@ public class JobImpl implements org.apac
job.conf, job.numMapTasks,
job.taskAttemptListener, job.jobToken,
job.fsTokens, job.clock,
- job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
@@ -1475,8 +1485,8 @@ public class JobImpl implements org.apac
@Override
public void transition(JobImpl job, JobEvent event) {
job.setupProgress = 1.0f;
- job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
- job.scheduleTasks(job.reduceTasks);
+ job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
+ job.scheduleTasks(job.reduceTasks, true);
// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
@@ -1507,7 +1517,12 @@ public class JobImpl implements org.apac
*/
@Override
public void transition(JobImpl job, JobEvent event) {
- job.startTime = job.clock.getTime();
+ JobStartEvent jse = (JobStartEvent) event;
+ if (jse.getRecoveredJobStartTime() != 0) {
+ job.startTime = jse.getRecoveredJobStartTime();
+ } else {
+ job.startTime = job.clock.getTime();
+ }
JobInitedEvent jie =
new JobInitedEvent(job.oldJobId,
job.startTime,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Thu Apr 11 05:02:45 2013
@@ -18,17 +18,13 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
-import java.util.Map;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -49,11 +45,10 @@ public class MapTaskImpl extends TaskImp
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
- MRAppMetrics metrics, AppContext appContext) {
+ int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, jobToken, credentials, clock,
- completedTasksFromPreviousRun, startCount, metrics, appContext);
+ appAttemptId, metrics, appContext);
this.taskSplitMetaInfo = taskSplitMetaInfo;
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Thu Apr 11 05:02:45 2013
@@ -18,16 +18,12 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
-import java.util.Map;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -47,11 +43,10 @@ public class ReduceTaskImpl extends Task
int numMapTasks, TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
- MRAppMetrics metrics, AppContext appContext) {
+ int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
taskAttemptListener, jobToken, credentials, clock,
- completedTasksFromPreviousRun, startCount, metrics, appContext);
+ appAttemptId, metrics, appContext);
this.numMapTasks = numMapTasks;
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Apr 11 05:02:45 2013
@@ -56,10 +56,12 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
@@ -89,6 +91,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
@@ -205,6 +208,11 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
.addTransition(TaskAttemptStateInternal.NEW,
+ EnumSet.of(TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.SUCCEEDED),
+ TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
+ .addTransition(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.NEW,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -1082,6 +1090,102 @@ public abstract class TaskAttemptImpl im
this.avataar = avataar;
}
+ @SuppressWarnings("unchecked")
+ public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
+ OutputCommitter committer, boolean recoverOutput) {
+ containerID = taInfo.getContainerId();
+ containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+ + taInfo.getPort());
+ containerMgrAddress = StringInterner.weakIntern(
+ containerNodeId.toString());
+ nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+ + taInfo.getHttpPort());
+ computeRackAndLocality();
+ launchTime = taInfo.getStartTime();
+ finishTime = (taInfo.getFinishTime() != -1) ?
+ taInfo.getFinishTime() : clock.getTime();
+ shufflePort = taInfo.getShufflePort();
+ trackerName = taInfo.getHostname();
+ httpPort = taInfo.getHttpPort();
+ sendLaunchedEvents();
+
+ reportedStatus.id = attemptId;
+ reportedStatus.progress = 1.0f;
+ reportedStatus.counters = taInfo.getCounters();
+ reportedStatus.stateString = taInfo.getState();
+ reportedStatus.phase = Phase.CLEANUP;
+ reportedStatus.mapFinishTime = taInfo.getMapFinishTime();
+ reportedStatus.shuffleFinishTime = taInfo.getShuffleFinishTime();
+ reportedStatus.sortFinishTime = taInfo.getSortFinishTime();
+ addDiagnosticInfo(taInfo.getError());
+
+ boolean needToClean = false;
+ String recoveredState = taInfo.getTaskStatus();
+ if (recoverOutput
+ && TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
+ TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptId));
+ try {
+ committer.recoverTask(tac);
+ LOG.info("Recovered output from task attempt " + attemptId);
+ } catch (Exception e) {
+ LOG.error("Unable to recover task attempt " + attemptId, e);
+ LOG.info("Task attempt " + attemptId + " will be recovered as KILLED");
+ recoveredState = TaskAttemptState.KILLED.toString();
+ needToClean = true;
+ }
+ }
+
+ TaskAttemptStateInternal attemptState;
+ if (TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
+ attemptState = TaskAttemptStateInternal.SUCCEEDED;
+ reportedStatus.taskState = TaskAttemptState.SUCCEEDED;
+ eventHandler.handle(createJobCounterUpdateEventTASucceeded(this));
+ logAttemptFinishedEvent(attemptState);
+ } else if (TaskAttemptState.FAILED.toString().equals(recoveredState)) {
+ attemptState = TaskAttemptStateInternal.FAILED;
+ reportedStatus.taskState = TaskAttemptState.FAILED;
+ eventHandler.handle(createJobCounterUpdateEventTAFailed(this, false));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(this,
+ TaskAttemptStateInternal.FAILED);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
+ } else {
+ if (!TaskAttemptState.KILLED.toString().equals(recoveredState)) {
+ if (String.valueOf(recoveredState).isEmpty()) {
+ LOG.info("TaskAttempt" + attemptId
+ + " had not completed, recovering as KILLED");
+ } else {
+ LOG.warn("TaskAttempt " + attemptId + " found in unexpected state "
+ + recoveredState + ", recovering as KILLED");
+ }
+ addDiagnosticInfo("Killed during application recovery");
+ needToClean = true;
+ }
+ attemptState = TaskAttemptStateInternal.KILLED;
+ reportedStatus.taskState = TaskAttemptState.KILLED;
+ eventHandler.handle(createJobCounterUpdateEventTAKilled(this, false));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(this,
+ TaskAttemptStateInternal.KILLED);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
+ }
+
+ if (needToClean) {
+ TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptId));
+ try {
+ committer.abortTask(tac);
+ } catch (Exception e) {
+ LOG.warn("Task cleanup failed for attempt " + attemptId, e);
+ }
+ }
+
+ return attemptState;
+ }
+
private static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) {
switch (smState) {
@@ -1122,6 +1226,24 @@ public abstract class TaskAttemptImpl im
}
}
+ private void computeRackAndLocality() {
+ nodeRackName = RackResolver.resolve(
+ containerNodeId.getHost()).getNetworkLocation();
+
+ locality = Locality.OFF_SWITCH;
+ if (dataLocalHosts.size() > 0) {
+ String cHost = resolveHost(containerNodeId.getHost());
+ if (dataLocalHosts.contains(cHost)) {
+ locality = Locality.NODE_LOCAL;
+ }
+ }
+ if (locality == Locality.OFF_SWITCH) {
+ if (dataLocalRacks.contains(nodeRackName)) {
+ locality = Locality.RACK_LOCAL;
+ }
+ }
+ }
+
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
int slotMemoryReq =
@@ -1141,6 +1263,18 @@ public abstract class TaskAttemptImpl im
return slotMillisIncrement;
}
+ private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
+ TaskAttemptImpl taskAttempt) {
+ long slotMillis = computeSlotMillis(taskAttempt);
+ TaskId taskId = taskAttempt.attemptId.getTaskId();
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
+ jce.addCounterUpdate(
+ taskId.getTaskType() == TaskType.MAP ?
+ JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
+ slotMillis);
+ return jce;
+ }
+
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
@@ -1210,6 +1344,26 @@ public abstract class TaskAttemptImpl im
return tauce;
}
+ @SuppressWarnings("unchecked")
+ private void sendLaunchedEvents() {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
+ .getJobId());
+ jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ?
+ JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
+ eventHandler.handle(jce);
+
+ LOG.info("TaskAttempt: [" + attemptId
+ + "] using containerId: [" + containerID + " on NM: ["
+ + containerMgrAddress + "]");
+ TaskAttemptStartedEvent tase =
+ new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ launchTime, trackerName, httpPort, shufflePort, containerID,
+ locality.toString(), avataar.toString());
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
+ }
+
private WrappedProgressSplitsBlock getProgressSplitBlock() {
readLock.lock();
try {
@@ -1342,8 +1496,6 @@ public abstract class TaskAttemptImpl im
taskAttempt.containerNodeId.toString());
taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
cEvent.getContainer().getNodeHttpAddress());
- taskAttempt.nodeRackName = RackResolver.resolve(
- taskAttempt.containerNodeId.getHost()).getNetworkLocation();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
taskAttempt.assignedCapability = cEvent.getContainer().getResource();
// this is a _real_ Task (classic Hadoop mapred flavor):
@@ -1354,19 +1506,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.taskAttemptListener.registerPendingTask(
taskAttempt.remoteTask, taskAttempt.jvmID);
- taskAttempt.locality = Locality.OFF_SWITCH;
- if (taskAttempt.dataLocalHosts.size() > 0) {
- String cHost = taskAttempt.resolveHost(
- taskAttempt.containerNodeId.getHost());
- if (taskAttempt.dataLocalHosts.contains(cHost)) {
- taskAttempt.locality = Locality.NODE_LOCAL;
- }
- }
- if (taskAttempt.locality == Locality.OFF_SWITCH) {
- if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
- taskAttempt.locality = Locality.RACK_LOCAL;
- }
- }
+ taskAttempt.computeRackAndLocality();
//launch the container
//create the container object to be launched for a given Task attempt
@@ -1471,27 +1611,7 @@ public abstract class TaskAttemptImpl im
// Costly?
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
- JobCounterUpdateEvent jce =
- new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
- .getJobId());
- jce.addCounterUpdate(
- taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ?
- JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
- , 1);
- taskAttempt.eventHandler.handle(jce);
-
- LOG.info("TaskAttempt: [" + taskAttempt.attemptId
- + "] using containerId: [" + taskAttempt.containerID + " on NM: ["
- + taskAttempt.containerMgrAddress + "]");
- TaskAttemptStartedEvent tase =
- new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
- taskAttempt.launchTime,
- nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
- taskAttempt.shufflePort, taskAttempt.containerID,
- taskAttempt.locality.toString(), taskAttempt.avataar.toString());
- taskAttempt.eventHandler.handle
- (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
+ taskAttempt.sendLaunchedEvents();
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
@@ -1540,14 +1660,8 @@ public abstract class TaskAttemptImpl im
TaskAttemptEvent event) {
//set the finish time
taskAttempt.setFinishTime();
- long slotMillis = computeSlotMillis(taskAttempt);
- TaskId taskId = taskAttempt.attemptId.getTaskId();
- JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
- jce.addCounterUpdate(
- taskId.getTaskType() == TaskType.MAP ?
- JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
- slotMillis);
- taskAttempt.eventHandler.handle(jce);
+ taskAttempt.eventHandler.handle(
+ createJobCounterUpdateEventTASucceeded(taskAttempt));
taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
@@ -1585,6 +1699,18 @@ public abstract class TaskAttemptImpl im
}
}
+ private static class RecoverTransition implements
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+ @Override
+ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event;
+ return taskAttempt.recover(tare.getTaskAttemptInfo(),
+ tare.getCommitter(), tare.getRecoverOutput());
+ }
+ }
+
@SuppressWarnings({ "unchecked" })
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started.
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Apr 11 05:02:45 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
@@ -37,7 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@@ -69,8 +70,10 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@@ -152,6 +155,12 @@ public abstract class TaskImpl implement
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
TaskEventType.T_KILL, new KillNewTransition())
+ .addTransition(TaskStateInternal.NEW,
+ EnumSet.of(TaskStateInternal.FAILED,
+ TaskStateInternal.KILLED,
+ TaskStateInternal.RUNNING,
+ TaskStateInternal.SUCCEEDED),
+ TaskEventType.T_RECOVER, new RecoverTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
@@ -250,20 +259,16 @@ public abstract class TaskImpl implement
// By default, the next TaskAttempt number is zero. Changes during recovery
protected int nextAttemptNumber = 0;
- private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
- new ArrayList<TaskAttemptInfo>();
- private static final class RecoverdAttemptsComparator implements
- Comparator<TaskAttemptInfo> {
- @Override
- public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
- long diff = attempt1.getStartTime() - attempt2.getStartTime();
- return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
- }
- }
-
- private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
- new RecoverdAttemptsComparator();
+ // For sorting task attempts by completion time
+ private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
+ new Comparator<TaskAttemptInfo>() {
+ @Override
+ public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
+ long diff = a.getFinishTime() - b.getFinishTime();
+ return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+ }
+ };
@Override
public TaskState getState() {
@@ -280,8 +285,7 @@ public abstract class TaskImpl implement
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
- MRAppMetrics metrics, AppContext appContext) {
+ int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
@@ -307,41 +311,15 @@ public abstract class TaskImpl implement
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
- // See if this is from a previous generation.
- if (completedTasksFromPreviousRun != null
- && completedTasksFromPreviousRun.containsKey(taskId)) {
- // This task has TaskAttempts from previous generation. We have to replay
- // them.
- LOG.info("Task is from previous run " + taskId);
- TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
- Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
- taskInfo.getAllTaskAttempts();
- taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
- taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
- Collections.sort(taskAttemptsFromPreviousGeneration,
- RECOVERED_ATTEMPTS_COMPARATOR);
- }
-
- if (taskAttemptsFromPreviousGeneration.isEmpty()) {
- // All the previous attempts are exhausted, now start with a new
- // generation.
-
- // All the new TaskAttemptIDs are generated based on MR
- // ApplicationAttemptID so that attempts from previous lives don't
- // over-step the current one. This assumes that a task won't have more
- // than 1000 attempts in its single generation, which is very reasonable.
- // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
- // and requires serious medical attention.
- nextAttemptNumber = (startCount - 1) * 1000;
- } else {
- // There are still some TaskAttempts from previous generation, use them
- nextAttemptNumber =
- taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
- }
-
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
+
+ // All the new TaskAttemptIDs are generated based on MR
+ // ApplicationAttemptID so that attempts from previous lives don't
+ // over-step the current one. This assumes that a task won't have more
+ // than 1000 attempts in its single generation, which is very reasonable.
+ nextAttemptNumber = (appAttemptId - 1) * 1000;
}
@Override
@@ -600,14 +578,28 @@ public abstract class TaskImpl implement
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar) {
- TaskAttempt attempt = createAttempt();
- ((TaskAttemptImpl) attempt).setAvataar(avataar);
+ TaskAttempt attempt = addAttempt(avataar);
+ inProgressAttempts.add(attempt.getID());
+ //schedule the nextAttemptNumber
+ if (failedAttempts.size() > 0) {
+ eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_RESCHEDULE));
+ } else {
+ eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_SCHEDULE));
+ }
+ }
+
+ private TaskAttemptImpl addAttempt(Avataar avataar) {
+ TaskAttemptImpl attempt = createAttempt();
+ attempt.setAvataar(avataar);
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
switch (attempts.size()) {
case 0:
- attempts = Collections.singletonMap(attempt.getID(), attempt);
+ attempts = Collections.singletonMap(attempt.getID(),
+ (TaskAttempt) attempt);
break;
case 1:
@@ -623,24 +615,8 @@ public abstract class TaskImpl implement
break;
}
- // Update nextATtemptNumber
- if (taskAttemptsFromPreviousGeneration.isEmpty()) {
- ++nextAttemptNumber;
- } else {
- // There are still some TaskAttempts from previous generation, use them
- nextAttemptNumber =
- taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
- }
-
- inProgressAttempts.add(attempt.getID());
- //schedule the nextAttemptNumber
- if (failedAttempts.size() > 0) {
- eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_RESCHEDULE));
- } else {
- eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_SCHEDULE));
- }
+ ++nextAttemptNumber;
+ return attempt;
}
@Override
@@ -705,6 +681,16 @@ public abstract class TaskImpl implement
}
}
+ private void sendTaskStartedEvent() {
+ TaskStartedEvent tse = new TaskStartedEvent(
+ TypeConverter.fromYarn(taskId), getLaunchTime(),
+ TypeConverter.fromYarn(taskId.getTaskType()),
+ getSplitsAsString());
+ eventHandler
+ .handle(new JobHistoryEvent(taskId.getJobId(), tse));
+ historyTaskStartGenerated = true;
+ }
+
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
@@ -740,6 +726,16 @@ public abstract class TaskImpl implement
task.successfulAttempt = null;
}
+ private void sendTaskSucceededEvents() {
+ eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+ LOG.info("Task succeeded with attempt " + successfulAttempt);
+ if (historyTaskStartGenerated) {
+ TaskFinishedEvent tfe = createTaskFinishedEvent(this,
+ TaskStateInternal.SUCCEEDED);
+ eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+ }
+ }
+
/**
* @return a String representation of the splits.
*
@@ -751,6 +747,122 @@ public abstract class TaskImpl implement
return "";
}
+ /**
+ * Recover a completed task from a previous application attempt
+ * @param taskInfo recovered info about the task
+ * @param recoverTaskOutput whether to recover task outputs
+ * @return state of the task after recovery
+ */
+ private TaskStateInternal recover(TaskInfo taskInfo,
+ OutputCommitter committer, boolean recoverTaskOutput) {
+ LOG.info("Recovering task " + taskId
+ + " from prior app attempt, status was " + taskInfo.getTaskStatus());
+
+ scheduledTime = taskInfo.getStartTime();
+ sendTaskStartedEvent();
+ Collection<TaskAttemptInfo> attemptInfos =
+ taskInfo.getAllTaskAttempts().values();
+
+ if (attemptInfos.size() > 0) {
+ metrics.launchedTask(this);
+ }
+
+ // recover the attempts for this task in the order they finished
+ // so task attempt completion events are ordered properly
+ int savedNextAttemptNumber = nextAttemptNumber;
+ ArrayList<TaskAttemptInfo> taInfos =
+ new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
+ Collections.sort(taInfos, TA_INFO_COMPARATOR);
+ for (TaskAttemptInfo taInfo : taInfos) {
+ nextAttemptNumber = taInfo.getAttemptId().getId();
+ TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN);
+ // handle the recovery inline so attempts complete before task does
+ attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
+ committer, recoverTaskOutput));
+ finishedAttempts.add(attempt.getID());
+ TaskAttemptCompletionEventStatus taces = null;
+ TaskAttemptState attemptState = attempt.getState();
+ switch (attemptState) {
+ case FAILED:
+ taces = TaskAttemptCompletionEventStatus.FAILED;
+ break;
+ case KILLED:
+ taces = TaskAttemptCompletionEventStatus.KILLED;
+ break;
+ case SUCCEEDED:
+ taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unexpected attempt state during recovery: " + attemptState);
+ }
+ if (attemptState == TaskAttemptState.FAILED) {
+ failedAttempts.add(attempt.getID());
+ if (failedAttempts.size() >= maxAttempts) {
+ taces = TaskAttemptCompletionEventStatus.TIPFAILED;
+ }
+ }
+
+ // don't clobber the successful attempt completion event
+ // TODO: this shouldn't be necessary after MAPREDUCE-4330
+ if (successfulAttempt == null) {
+ handleTaskAttemptCompletion(attempt.getID(), taces);
+ if (attemptState == TaskAttemptState.SUCCEEDED) {
+ successfulAttempt = attempt.getID();
+ }
+ }
+ }
+ nextAttemptNumber = savedNextAttemptNumber;
+
+ TaskStateInternal taskState = TaskStateInternal.valueOf(
+ taskInfo.getTaskStatus());
+ switch (taskState) {
+ case SUCCEEDED:
+ if (successfulAttempt != null) {
+ sendTaskSucceededEvents();
+ } else {
+ LOG.info("Missing successful attempt for task " + taskId
+ + ", recovering as RUNNING");
+ // there must have been a fetch failure and the retry wasn't complete
+ taskState = TaskStateInternal.RUNNING;
+ metrics.runningTask(this);
+ addAndScheduleAttempt(Avataar.VIRGIN);
+ }
+ break;
+ case FAILED:
+ case KILLED:
+ {
+ if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
+ metrics.endWaitingTask(this);
+ }
+ TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
+ taskInfo.getFinishTime(), taskInfo.getTaskType(),
+ taskInfo.getError(), taskInfo.getTaskStatus(),
+ taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
+ eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+ eventHandler.handle(
+ new JobTaskEvent(taskId, getExternalState(taskState)));
+ break;
+ }
+ default:
+ throw new java.lang.AssertionError("Unexpected recovered task state: "
+ + taskState);
+ }
+
+ return taskState;
+ }
+
+ private static class RecoverTransition
+ implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+ TaskRecoverEvent tre = (TaskRecoverEvent) event;
+ return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
+ tre.getRecoverTaskOutput());
+ }
+ }
+
private static class InitialScheduleTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@@ -758,13 +870,7 @@ public abstract class TaskImpl implement
public void transition(TaskImpl task, TaskEvent event) {
task.addAndScheduleAttempt(Avataar.VIRGIN);
task.scheduledTime = task.clock.getTime();
- TaskStartedEvent tse = new TaskStartedEvent(
- TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
- TypeConverter.fromYarn(task.taskId.getTaskType()),
- task.getSplitsAsString());
- task.eventHandler
- .handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
- task.historyTaskStartGenerated = true;
+ task.sendTaskStartedEvent();
}
}
@@ -818,16 +924,7 @@ public abstract class TaskImpl implement
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
task.successfulAttempt = taskAttemptId;
- task.eventHandler.handle(new JobTaskEvent(
- task.taskId, TaskState.SUCCEEDED));
- LOG.info("Task succeeded with attempt " + task.successfulAttempt);
- // issue kill to all other attempts
- if (task.historyTaskStartGenerated) {
- TaskFinishedEvent tfe = createTaskFinishedEvent(task,
- TaskStateInternal.SUCCEEDED);
- task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
- tfe));
- }
+ task.sendTaskSucceededEvents();
for (TaskAttempt attempt : task.attempts.values()) {
if (attempt.getID() != task.successfulAttempt &&
// This is okay because it can only talk us out of sending a
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1466770&r1=1466769&r2=1466770&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Apr 11 05:02:45 2013
@@ -414,7 +414,8 @@ public class MRApp extends MRAppMaster {
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- isNewApiCommitter(), currentUser.getUserName(), getContext(),
+ getCommitter(), isNewApiCommitter(),
+ currentUser.getUserName(), getContext(),
forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
@@ -648,12 +649,13 @@ public class MRApp extends MRAppMaster {
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
- boolean newApiCommitter, String user, AppContext appContext,
+ OutputCommitter committer, boolean newApiCommitter,
+ String user, AppContext appContext,
JobStateInternal forcedState, String diagnostic) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
- getCompletedTaskFromPreviousRun(), metrics,
+ getCompletedTaskFromPreviousRun(), metrics, committer,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
appContext, forcedState, diagnostic);