You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/04/26 10:01:05 UTC
svn commit: r1096692 - in /hadoop/mapreduce/branches/MR-279/mr-client:
hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-clie...
Author: sharad
Date: Tue Apr 26 08:01:04 2011
New Revision: 1096692
URL: http://svn.apache.org/viewvc?rev=1096692&view=rev
Log:
Recovery of MR Application Master from failures.
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
Modified:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Apr 26 08:01:04 2011
@@ -56,6 +56,7 @@ public class JobHistoryEventHandler exte
implements EventHandler<JobHistoryEvent> {
private final AppContext context;
+ private final int startCount;
private FileContext logDirFc; // log Dir FileContext
private FileContext doneDirFc; // done Dir FileContext
@@ -80,9 +81,10 @@ public class JobHistoryEventHandler exte
public static final FsPermission HISTORY_FILE_PERMISSION =
FsPermission.createImmutable((short) 0740); // rwxr-----
- public JobHistoryEventHandler(AppContext context) {
+ public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
this.context = context;
+ this.startCount = startCount;
}
@Override
@@ -344,8 +346,10 @@ public class JobHistoryEventHandler exte
/**
* Get the job history file path
*/
- public static Path getJobHistoryFile(Path dir, JobId jobId) {
- return new Path(dir, TypeConverter.fromYarn(jobId).toString());
+ private Path getJobHistoryFile(Path dir, JobId jobId) {
+ return new Path(dir, TypeConverter.fromYarn(jobId).toString() + "_" +
+ startCount);
+
}
/*
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Apr 26 08:01:04 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -56,6 +58,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
+import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
@@ -102,9 +106,10 @@ public class MRAppMaster extends Composi
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
- private final Clock clock;
-
- private ApplicationId appID;
+ private Clock clock;
+ private final int startCount;
+ private final ApplicationId appID;
+ private Set<TaskId> completedTasksFromPreviousRun;
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
@@ -117,21 +122,16 @@ public class MRAppMaster extends Composi
new JobTokenSecretManager();
private Job job;
- private int failCount = 0;
-
- public MRAppMaster(ApplicationId applicationId) {
- this(applicationId, new SystemClock());
- }
- public MRAppMaster(ApplicationId applicationId, int failCount) {
- this(applicationId);
- this.failCount = failCount;
+ public MRAppMaster(ApplicationId applicationId, int startCount) {
+ this(applicationId, new SystemClock(), startCount);
}
-
- public MRAppMaster(ApplicationId applicationId, Clock clock) {
+
+ public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.appID = applicationId;
+ this.startCount = startCount;
LOG.info("Created MRAppMaster for application " + applicationId);
}
@@ -139,8 +139,18 @@ public class MRAppMaster extends Composi
public void init(final Configuration conf) {
context = new RunningAppContext();
- dispatcher = new AsyncDispatcher();
- addIfService(dispatcher);
+ if (conf.getBoolean(YarnMRJobConfig.RECOVERY_ENABLE, false)
+ && startCount > 1) {
+ LOG.info("Recovery is enabled. Will try to recover from previous life.");
+ Recovery recoveryServ = new RecoveryService(appID, clock, startCount);
+ addIfService(recoveryServ);
+ dispatcher = recoveryServ.getDispatcher();
+ clock = recoveryServ.getClock();
+ completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
+ } else {
+ dispatcher = new AsyncDispatcher();
+ addIfService(dispatcher);
+ }
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context);
@@ -259,7 +269,7 @@ public class MRAppMaster extends Composi
// create single job
Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens,
- clock);
+ clock, startCount, completedTasksFromPreviousRun);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@@ -312,7 +322,8 @@ public class MRAppMaster extends Composi
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
- JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context);
+ JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
+ getStartCount());
return eventHandler;
}
@@ -385,6 +396,10 @@ public class MRAppMaster extends Composi
return appID;
}
+ public int getStartCount() {
+ return startCount;
+ }
+
public AppContext getContext() {
return context;
}
@@ -393,6 +408,10 @@ public class MRAppMaster extends Composi
return dispatcher;
}
+ public Set<TaskId> getCompletedTaskFromPreviousRun() {
+ return completedTasksFromPreviousRun;
+ }
+
//Returns null if speculation is not enabled
public Speculator getSpeculator() {
return speculator;
@@ -503,7 +522,7 @@ public class MRAppMaster extends Composi
applicationId.setClusterTimestamp(Long.valueOf(args[0]));
applicationId.setId(Integer.valueOf(args[1]));
int failCount = Integer.valueOf(args[2]);
- MRAppMaster appMaster = new MRAppMaster(applicationId, failCount);
+ MRAppMaster appMaster = new MRAppMaster(applicationId, ++failCount);
YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(YARNApplicationConstants.JOB_CONF_FILE));
conf.set(MRJobConfig.USER_NAME,
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Apr 26 08:01:04 2011
@@ -127,6 +127,8 @@ public class JobImpl implements org.apac
//final fields
private final Clock clock;
+ private final int startCount;
+ private final Set<TaskId> completedTasksFromPreviousRun;
private final Lock readLock;
private final Lock writeLock;
private final JobId jobId;
@@ -341,11 +343,14 @@ public class JobImpl implements org.apac
public JobImpl(ApplicationId appID, Configuration conf,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
- Credentials fsTokenCredentials, Clock clock) {
+ Credentials fsTokenCredentials, Clock clock, int startCount,
+ Set<TaskId> completedTasksFromPreviousRun) {
this.jobId = recordFactory.newRecordInstance(JobId.class);
this.conf = conf;
this.clock = clock;
+ this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
+ this.startCount = startCount;
jobId.setAppId(appID);
jobId.setId(appID.getId());
oldJobId = TypeConverter.fromYarn(jobId);
@@ -900,7 +905,7 @@ public class JobImpl implements org.apac
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens.getAllTokens(),
- job.clock);
+ job.clock, job.completedTasksFromPreviousRun, job.startCount);
job.addTask(task);
}
LOG.info("Input size for job " + job.jobId + " = " + inputLength
@@ -915,7 +920,8 @@ public class JobImpl implements org.apac
job.remoteJobConfFile,
job.conf, job.numMapTasks,
job.taskAttemptListener, job.committer, job.jobToken,
- job.fsTokens.getAllTokens(), job.clock);
+ job.fsTokens.getAllTokens(), job.clock,
+ job.completedTasksFromPreviousRun, job.startCount);
job.addTask(task);
}
LOG.info("Number of reduces for job " + job.jobId + " = "
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Tue Apr 26 08:01:04 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.security.token.Token;
@@ -46,9 +48,11 @@ public class MapTaskImpl extends TaskImp
TaskSplitMetaInfo taskSplitMetaInfo,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
- Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
+ Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
+ Set<TaskId> completedTasksFromPreviousRun, int startCount) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
- conf, taskAttemptListener, committer, jobToken, fsTokens, clock);
+ conf, taskAttemptListener, committer, jobToken, fsTokens, clock,
+ completedTasksFromPreviousRun, startCount);
this.taskSplitMetaInfo = taskSplitMetaInfo;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Tue Apr 26 08:01:04 2011
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -27,6 +28,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.security.token.Token;
@@ -42,9 +44,11 @@ public class ReduceTaskImpl extends Task
EventHandler eventHandler, Path jobFile, Configuration conf,
int numMapTasks, TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
- Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
+ Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
+ Set<TaskId> completedTasksFromPreviousRun, int startCount) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
- taskAttemptListener, committer, jobToken, fsTokens, clock);
+ taskAttemptListener, committer, jobToken, fsTokens, clock,
+ completedTasksFromPreviousRun, startCount);
this.numMapTasks = numMapTasks;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Apr 26 08:01:04 2011
@@ -989,30 +989,7 @@ public abstract class TaskAttemptImpl im
String taskType =
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
LOG.info("In TaskAttemptImpl taskType: " + taskType);
- if (taskType.equals("MAP")) {
- MapAttemptFinishedEvent mfe =
- new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
- TaskAttemptState.SUCCEEDED.toString(),
- taskAttempt.finishTime,
- taskAttempt.finishTime, "hostname",
- TaskAttemptState.SUCCEEDED.toString(),
- TypeConverter.fromYarn(taskAttempt.getCounters()),null);
- taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe));
- } else {
- ReduceAttemptFinishedEvent rfe =
- new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
- TaskAttemptState.SUCCEEDED.toString(),
- taskAttempt.finishTime,
- taskAttempt.finishTime,
- taskAttempt.finishTime, "hostname",
- TaskAttemptState.SUCCEEDED.toString(),
- TypeConverter.fromYarn(taskAttempt.getCounters()),null);
- taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe));
- }
+ taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
/*
TaskAttemptFinishedEvent tfe =
new TaskAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
@@ -1047,36 +1024,40 @@ public abstract class TaskAttemptImpl im
taskAttempt.reportedStatus.diagnosticInfo.toString());
taskAttempt.eventHandler.handle(
new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), ta));
- if (taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP) {
- MapAttemptFinishedEvent mfe =
- new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
- TaskAttemptState.FAILED.toString(),
- taskAttempt.finishTime,
- taskAttempt.finishTime, "hostname",
- TaskAttemptState.FAILED.toString(),
- TypeConverter.fromYarn(taskAttempt.getCounters()),null);
- taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe));
- } else {
- ReduceAttemptFinishedEvent rfe =
- new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
- TaskAttemptState.FAILED.toString(),
- taskAttempt.finishTime,
- taskAttempt.finishTime,
- taskAttempt.finishTime, "hostname",
- TaskAttemptState.FAILED.toString(),
- TypeConverter.fromYarn(taskAttempt.getCounters()),null);
- taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe));
- }
+ taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_FAILED));
}
}
+ private void logAttemptFinishedEvent(TaskAttemptState state) {
+ if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
+ MapAttemptFinishedEvent mfe =
+ new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ finishTime,
+ finishTime, "hostname",
+ state.toString(),
+ TypeConverter.fromYarn(getCounters()),null);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
+ } else {
+ ReduceAttemptFinishedEvent rfe =
+ new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ finishTime,
+ finishTime,
+ finishTime, "hostname",
+ state.toString(),
+ TypeConverter.fromYarn(getCounters()),null);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
+ }
+ }
+
private static class TooManyFetchFailureTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
@@ -1108,6 +1089,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.reportedStatus.diagnosticInfo.toString());
taskAttempt.eventHandler.handle(
new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tke));
+ taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Apr 26 08:01:04 2011
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -219,7 +220,8 @@ public abstract class TaskImpl implement
EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
- Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
+ Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
+ Set<TaskId> completedTasksFromPreviousRun, int startCount) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
@@ -242,6 +244,18 @@ public abstract class TaskImpl implement
this.fsTokens = fsTokens;
this.jobToken = jobToken;
+ if (completedTasksFromPreviousRun != null
+ && completedTasksFromPreviousRun.contains(taskId)) {
+ LOG.info("Task is from previous run " + taskId);
+ startCount = startCount - 1;
+ }
+
+ //attempt ids are generated based on MR app startCount so that attempts
+ //from previous lives don't overstep the current one.
+ //this assumes that a task won't have more than 1000 attempts in its single
+ //life
+ nextAttemptNumber = (startCount - 1) * 1000;
+
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java?rev=1096692&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java Tue Apr 26 08:01:04 2011
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.v2.app.recover;
+
+import org.apache.hadoop.yarn.Clock;
+
+class ControlledClock implements Clock {
+ private long time = -1;
+ private final Clock actualClock;
+ ControlledClock(Clock actualClock) {
+ this.actualClock = actualClock;
+ }
+ synchronized void setTime(long time) {
+ this.time = time;
+ }
+ synchronized void reset() {
+ time = -1;
+ }
+
+ @Override
+ public synchronized long getTime() {
+ if (time != -1) {
+ return time;
+ }
+ return actualClock.getTime();
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java?rev=1096692&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java Tue Apr 26 08:01:04 2011
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.recover;
+
+import java.util.Set;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.Dispatcher;
+
+public interface Recovery {
+
+ Dispatcher getDispatcher();
+
+ Clock getClock();
+
+ Set<TaskId> getCompletedTasks();
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1096692&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Apr 26 08:01:04 2011
@@ -0,0 +1,363 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.recover;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+
+/*
+ * Recovers the completed tasks from the previous life of Application Master.
+ * The completed tasks are deciphered from the history file of the previous life.
+ * Recovery service intercepts and replay the events for completed tasks.
+ * While recovery is in progress, the scheduling of new tasks are delayed by
+ * buffering the task schedule events.
+ * The recovery service controls the clock while recovery is in progress.
+ */
+
+//TODO:
+//task cleanup for all non completed tasks
+//change job output committer to have
+// - atomic job output promotion
+// - recover output of completed tasks
+
+public class RecoveryService extends CompositeService implements Recovery {
+
+ private static final Log LOG = LogFactory.getLog(RecoveryService.class);
+
+ private final ApplicationId appID;
+ private final Dispatcher dispatcher;
+ private final ControlledClock clock;
+ private final int startCount;
+
+ private JobInfo jobInfo = null;
+ private final Map<TaskId, TaskInfo> completedTasks =
+ new HashMap<TaskId, TaskInfo>();
+
+ private final List<TaskEvent> pendingTaskScheduleEvents =
+ new ArrayList<TaskEvent>();
+
+ private volatile boolean recoveryMode = false;
+
+ public RecoveryService(ApplicationId appID, Clock clock, int startCount) {
+ super("RecoveringDispatcher");
+ this.appID = appID;
+ this.startCount = startCount;
+ this.dispatcher = new RecoveryDispatcher();
+ this.clock = new ControlledClock(clock);
+ if (dispatcher instanceof Service) {
+ addService((Service) dispatcher);
+ }
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ // parse the history file
+ try {
+ parse();
+ if (completedTasks.size() > 0) {
+ recoveryMode = true;
+ LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " +
+ "TO RECOVER " + completedTasks.size());
+ LOG.info("Job launch time " + jobInfo.getLaunchTime());
+ clock.setTime(jobInfo.getLaunchTime());
+ }
+ } catch (IOException e) {
+ LOG.warn(e);
+ LOG.warn("Could not parse the old history file. Aborting recovery. "
+ + "Starting afresh.");
+ }
+ }
+
+ @Override
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ @Override
+ public Clock getClock() {
+ return clock;
+ }
+
+ @Override
+ public Set<TaskId> getCompletedTasks() {
+ return completedTasks.keySet();
+ }
+
+ private void parse() throws IOException {
+ // TODO: parse history file based on startCount
+ String jobName = TypeConverter.fromYarn(appID).toString();
+ String defaultStagingDir = getConfig().get(
+ YARNApplicationConstants.APPS_STAGING_DIR_KEY)
+ + "/history/staging";
+ String jobhistoryDir = getConfig().get(
+ YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir);
+ FSDataInputStream in = null;
+ Path historyFile = null;
+ Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
+ new Path(jobhistoryDir));
+ FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
+ getConfig());
+ historyFile = fc.makeQualified(new Path(histDirPath, jobName + "_" +
+ (startCount -1))); //read the previous history file
+ in = fc.open(historyFile);
+ JobHistoryParser parser = new JobHistoryParser(in);
+ jobInfo = parser.parse();
+ Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+ .getAllTasks();
+ for (TaskInfo taskInfo : taskInfos.values()) {
+ if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+ completedTasks
+ .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+ LOG.info("Read from history task "
+ + TypeConverter.toYarn(taskInfo.getTaskId()));
+ }
+ }
+ LOG.info("Read completed tasks from history "
+ + completedTasks.size());
+ }
+
+ class RecoveryDispatcher extends AsyncDispatcher {
+ private final EventHandler actualHandler;
+ private final EventHandler handler;
+
+ RecoveryDispatcher() {
+ actualHandler = super.getEventHandler();
+ handler = new InterceptingEventHandler(actualHandler);
+ }
+
+ @Override
+ public void dispatch(Event event) {
+ if (recoveryMode) {
+ if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
+ TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+ .getTaskAttemptID());
+ LOG.info("Attempt start time " + attInfo.getStartTime());
+ clock.setTime(attInfo.getStartTime());
+
+ } else if (event.getType() == TaskAttemptEventType.TA_DONE
+ || event.getType() == TaskAttemptEventType.TA_FAILMSG
+ || event.getType() == TaskAttemptEventType.TA_KILL) {
+ TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+ .getTaskAttemptID());
+ LOG.info("Attempt finish time " + attInfo.getFinishTime());
+ clock.setTime(attInfo.getFinishTime());
+ }
+
+ else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
+ || event.getType() == TaskEventType.T_ATTEMPT_KILLED
+ || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
+ TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
+ LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
+ TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
+ .getTaskId());
+ taskInfo.getAllTaskAttempts().remove(
+ TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
+ // remove the task info from completed tasks if all attempts are
+ // recovered
+ if (taskInfo.getAllTaskAttempts().size() == 0) {
+ completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
+ // checkForRecoveryComplete
+ LOG.info("CompletedTasks() " + completedTasks.size());
+ if (completedTasks.size() == 0) {
+ recoveryMode = false;
+ clock.reset();
+ LOG.info("Setting the recovery mode to false. " +
+ "Recovery is complete!");
+
+ // send all pending tasks schedule events
+ for (TaskEvent tEv : pendingTaskScheduleEvents) {
+ actualHandler.handle(tEv);
+ }
+
+ }
+ }
+ }
+ }
+ super.dispatch(event);
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return handler;
+ }
+ }
+
+ private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
+ TaskInfo taskInfo = completedTasks.get(id.getTaskId());
+ return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
+ }
+
+ private class InterceptingEventHandler implements EventHandler {
+ EventHandler actualHandler;
+
+ InterceptingEventHandler(EventHandler actualHandler) {
+ this.actualHandler = actualHandler;
+ }
+
+ @Override
+ public void handle(Event event) {
+ if (!recoveryMode) {
+ // delegate to the dispatcher one
+ actualHandler.handle(event);
+ return;
+ }
+
+ else if (event.getType() == TaskEventType.T_SCHEDULE) {
+ TaskEvent taskEvent = (TaskEvent) event;
+ // delay the scheduling of new tasks till previous ones are recovered
+ if (completedTasks.get(taskEvent.getTaskID()) == null) {
+ LOG.debug("Adding to pending task events "
+ + taskEvent.getTaskID());
+ pendingTaskScheduleEvents.add(taskEvent);
+ return;
+ }
+ }
+
+ else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+ TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID();
+ TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+ LOG.debug("CONTAINER_REQ " + aId);
+ sendAssignedEvent(aId, attInfo);
+ return;
+ }
+
+ else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
+ TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
+ LOG.debug("TASK_CLEAN");
+ actualHandler.handle(new TaskAttemptEvent(aId,
+ TaskAttemptEventType.TA_CLEANUP_DONE));
+ return;
+ }
+
+ else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
+ TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
+ .getTaskAttemptID();
+ TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+ actualHandler.handle(new TaskAttemptEvent(aId,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+ // send the status update event
+ sendStatusUpdateEvent(aId, attInfo);
+
+ TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getState());
+ switch (state) {
+ case SUCCEEDED:
+ // send the done event
+ LOG.info("Sending done event to " + aId);
+ actualHandler.handle(new TaskAttemptEvent(aId,
+ TaskAttemptEventType.TA_DONE));
+ break;
+ case KILLED:
+ LOG.info("Sending kill event to " + aId);
+ actualHandler.handle(new TaskAttemptEvent(aId,
+ TaskAttemptEventType.TA_KILL));
+ break;
+ default:
+ LOG.info("Sending fail event to " + aId);
+ actualHandler.handle(new TaskAttemptEvent(aId,
+ TaskAttemptEventType.TA_FAILMSG));
+ break;
+ }
+ return;
+ }
+
+ // delegate to the actual handler
+ actualHandler.handle(event);
+ }
+
+ private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
+ TaskAttemptInfo attemptInfo) {
+ LOG.info("Sending status update event to " + yarnAttemptID);
+ TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+ taskAttemptStatus.id = yarnAttemptID;
+ taskAttemptStatus.progress = 1.0f;
+ taskAttemptStatus.diagnosticInfo = "";
+ taskAttemptStatus.stateString = attemptInfo.getState();
+ // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
+ taskAttemptStatus.phase = Phase.CLEANUP;
+ org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
+ if (cntrs == null) {
+ taskAttemptStatus.counters = null;
+ } else {
+ taskAttemptStatus.counters = TypeConverter.toYarn(attemptInfo
+ .getCounters());
+ }
+ actualHandler.handle(new TaskAttemptStatusUpdateEvent(
+ taskAttemptStatus.id, taskAttemptStatus));
+ }
+
+ private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
+ TaskAttemptInfo attemptInfo) {
+ LOG.info("Sending assigned event to " + yarnAttemptID);
+ ContainerId cId = RecordFactoryProvider.getRecordFactory(null)
+ .newRecordInstance(ContainerId.class);
+ actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
+ cId, null, attemptInfo.getHostname() + ":" +
+ attemptInfo.getHttpPort(), null));
+ }
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Apr 26 08:01:04 2011
@@ -85,10 +85,20 @@ public class MRApp extends MRAppMaster {
//if true, tasks complete automatically as soon as they are launched
protected boolean autoComplete = false;
+ static ApplicationId applicationId;
+
+ static {
+ applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+ applicationId.setClusterTimestamp(0);
+ applicationId.setId(0);
+ }
+
public MRApp(int maps, int reduces, boolean autoComplete) {
-
- super(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
- ApplicationId.class));
+ this(maps, reduces, autoComplete, 1);
+ }
+
+ public MRApp(int maps, int reduces, boolean autoComplete, int startCount) {
+ super(applicationId, startCount);
this.maps = maps;
this.reduces = reduces;
this.autoComplete = autoComplete;
@@ -163,10 +173,14 @@ public class MRApp extends MRAppMaster {
JobReport jobReport = job.getReport();
Assert.assertTrue("Job start time is not less than finish time",
jobReport.getStartTime() < jobReport.getFinishTime());
+ System.out.println("Job start time :" + jobReport.getStartTime());
+ System.out.println("Job finish time :" + jobReport.getFinishTime());
Assert.assertTrue("Job finish time is in future",
jobReport.getFinishTime() < System.currentTimeMillis());
for (Task task : job.getTasks().values()) {
TaskReport taskReport = task.getReport();
+ System.out.println("Task start time : " + taskReport.getStartTime());
+ System.out.println("Task finish time : " + taskReport.getFinishTime());
Assert.assertTrue("Task start time is not less than finish time",
taskReport.getStartTime() < taskReport.getFinishTime());
for (TaskAttempt attempt : task.getAttempts().values()) {
@@ -310,7 +324,8 @@ public class MRApp extends MRAppMaster {
public TestJob(ApplicationId appID, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock) {
super(appID, new Configuration(), eventHandler, taskAttemptListener,
- new JobTokenSecretManager(), new Credentials(), clock);
+ new JobTokenSecretManager(), new Credentials(), clock, getStartCount(),
+ getCompletedTaskFromPreviousRun());
// This "this leak" is okay because the retained pointer is in an
// instance variable.
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1096692&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Apr 26 08:01:04 2011
@@ -0,0 +1,185 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.junit.Test;
+
+public class TestRecovery {
+
+ private static final Log LOG = LogFactory.getLog(TestRecovery.class);
+
+ @Test
+ public void testCrashed() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRApp(2, 1, false, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ long jobStartTime = job.getReport().getStartTime();
+ //all maps would be running
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task mapTask2 = it.next();
+ Task reduceTask = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+ app.waitForState(mapTask2, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+ TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+ // reduces must be in NEW state
+ Assert.assertEquals("Reduce Task state not correct",
+ TaskState.NEW, reduceTask.getReport().getTaskState());
+
+ //send the fail signal to the 1st map task attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt1.getID(),
+ TaskAttemptEventType.TA_FAILMSG));
+
+ app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
+
+ while (mapTask1.getAttempts().size() != 2) {
+ Thread.sleep(2000);
+ LOG.info("Waiting for next attempt to start");
+ }
+ Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
+ itr.next();
+ TaskAttempt task1Attempt2 = itr.next();
+
+ app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
+
+ //send the kill signal to the 1st map 2nd attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt2.getID(),
+ TaskAttemptEventType.TA_KILL));
+
+ app.waitForState(task1Attempt2, TaskAttemptState.KILLED);
+
+ while (mapTask1.getAttempts().size() != 3) {
+ Thread.sleep(2000);
+ LOG.info("Waiting for next attempt to start");
+ }
+ itr = mapTask1.getAttempts().values().iterator();
+ itr.next();
+ itr.next();
+ TaskAttempt task1Attempt3 = itr.next();
+
+ app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
+
+ //send the done signal to the 1st map 3rd attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt3.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for first map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+ long task1StartTime = mapTask1.getReport().getStartTime();
+ long task1FinishTime = mapTask1.getReport().getFinishTime();
+
+ //stop the app
+ app.stop();
+
+ //rerun
+ //in rerun the 1st map will be recovered from previous run
+ app = new MRApp(2, 1, false, ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(YarnMRJobConfig.RECOVERY_ENABLE, true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ //all maps would be running
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ mapTask2 = it.next();
+ reduceTask = it.next();
+
+ // first map will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(mapTask2, TaskState.RUNNING);
+
+ task2Attempt = mapTask2.getAttempts().values().iterator().next();
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+ //send the done signal to the 2nd map task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ mapTask2.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait to get it completed
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+ //wait for reduce to be running before sending done
+ app.waitForState(reduceTask, TaskState.RUNNING);
+ //send the done signal to the reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduceTask.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ Assert.assertEquals("Job Start time not correct",
+ jobStartTime, job.getReport().getStartTime());
+ Assert.assertEquals("Task Start time not correct",
+ task1StartTime, mapTask1.getReport().getStartTime());
+ Assert.assertEquals("Task Finish time not correct",
+ task1FinishTime, mapTask1.getReport().getFinishTime());
+ }
+
+ public static void main(String[] arg) throws Exception {
+ TestRecovery test = new TestRecovery();
+ test.testCrashed();
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Tue Apr 26 08:01:04 2011
@@ -53,4 +53,7 @@ public class YarnMRJobConfig {
"address.webapp";
public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
"0.0.0.0:19888";
+
+ public static final String RECOVERY_ENABLE
+ = "yarn.mapreduce.job.recovery.enable";
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=1096692&r1=1096691&r2=1096692&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Tue Apr 26 08:01:04 2011
@@ -68,6 +68,7 @@ class EventWriter {
void flush() throws IOException {
encoder.flush();
+ out.hflush();
}
void close() throws IOException {