You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/24 10:44:55 UTC
svn commit: r1188044 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org...
Author: vinodkv
Date: Mon Oct 24 08:44:54 2011
New Revision: 1188044
URL: http://svn.apache.org/viewvc?rev=1188044&view=rev
Log:
MAPREDUCE-2708. Designed and implemented MR Application Master recovery to make MR AMs resume their progress after restart. Contributed by Sharad Agarwal.
svn merge -c r1188043 --ignore-ancestry ../../trunk/
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Oct 24 08:44:54 2011
@@ -62,6 +62,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3104. Implemented Application-acls. (vinodkv)
+ MAPREDUCE-2708. Designed and implemented MR Application Master recovery to
+ make MR AMs resume their progress after restart. (Sharad Agarwal via vinodkv)
+
IMPROVEMENTS
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Mon Oct 24 08:44:54 2011
@@ -29,11 +29,9 @@ import org.apache.hadoop.mapred.TaskLog.
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Apps;
public class MapReduceChildJVM {
@@ -131,6 +129,8 @@ public class MapReduceChildJVM {
MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR)
);
+ environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV,
+ conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
}
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Mon Oct 24 08:44:54 2011
@@ -239,6 +239,14 @@ class YarnChild {
Token<JobTokenIdentifier> jt) throws IOException {
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
job.setCredentials(credentials);
+
+ String appAttemptIdEnv = System
+ .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+ LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
+ // Set it in conf, so as to be able to be used the the OutputCommitter.
+ job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
+ .parseInt(appAttemptIdEnv));
+
// set tcp nodelay
job.setBoolean("ipc.client.tcpnodelay", true);
job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Oct 24 08:44:54 2011
@@ -36,18 +36,25 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
-import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -76,12 +83,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
@@ -93,6 +102,8 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
@@ -121,6 +132,9 @@ public class MRAppMaster extends Composi
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
private Clock clock;
private final long startTime;
private final long appSubmitTime;
@@ -143,6 +157,9 @@ public class MRAppMaster extends Composi
private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
+ private JobId jobId;
+ private boolean newApiCommitter;
+ private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private boolean inRecovery = false;
@@ -182,15 +199,39 @@ public class MRAppMaster extends Composi
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
- if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
- && appAttemptID.getAttemptId() > 1) {
- LOG.info("Recovery is enabled. Will try to recover from previous life.");
- recoveryServ = new RecoveryService(appAttemptID, clock);
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
+
+ newApiCommitter = false;
+ jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
+ appAttemptID.getApplicationId().getId());
+ int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ if ((numReduceTasks > 0 &&
+ conf.getBoolean("mapred.reducer.new-api", false)) ||
+ (numReduceTasks == 0 &&
+ conf.getBoolean("mapred.mapper.new-api", false))) {
+ newApiCommitter = true;
+ LOG.info("Using mapred newApiCommitter.");
+ }
+
+ committer = createOutputCommitter(conf);
+ boolean recoveryEnabled = conf.getBoolean(
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+ if (recoveryEnabled && recoverySupportedByCommitter
+ && appAttemptID.getAttemptId() > 1) {
+ LOG.info("Recovery is enabled. "
+ + "Will try to recover from previous life on best effort basis.");
+ recoveryServ = new RecoveryService(appAttemptID, clock,
+ committer);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
inRecovery = true;
} else {
+ LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ + recoveryEnabled + " recoverySupportedByCommitter: "
+ + recoverySupportedByCommitter + " ApplicationAttemptID: "
+ + appAttemptID.getAttemptId());
dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
}
@@ -253,7 +294,36 @@ public class MRAppMaster extends Composi
super.init(conf);
} // end of init()
-
+ private OutputCommitter createOutputCommitter(Configuration conf) {
+ OutputCommitter committer = null;
+
+ LOG.info("OutputCommitter set in config "
+ + conf.get("mapred.output.committer.class"));
+
+ if (newApiCommitter) {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
+ .newTaskId(jobId, 0, TaskType.MAP);
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
+ .newTaskAttemptId(taskID, 0);
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptID));
+ OutputFormat outputFormat;
+ try {
+ outputFormat = ReflectionUtils.newInstance(taskContext
+ .getOutputFormatClass(), conf);
+ committer = outputFormat.getOutputCommitter(taskContext);
+ } catch (Exception e) {
+ throw new YarnException(e);
+ }
+ } else {
+ committer = ReflectionUtils.newInstance(conf.getClass(
+ "mapred.output.committer.class", FileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class), conf);
+ }
+ LOG.info("OutputCommitter is " + committer.getClass().getName());
+ return committer;
+ }
+
protected boolean keepJobFiles(JobConf conf) {
return (conf.getKeepTaskFilesPattern() != null || conf
.getKeepFailedTaskFiles());
@@ -348,10 +418,10 @@ public class MRAppMaster extends Composi
protected Job createJob(Configuration conf) {
// create single job
- Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
- taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
- completedTasksFromPreviousRun, metrics, currentUser.getUserName(),
- appSubmitTime, amInfos);
+ Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher
+ .getEventHandler(), taskAttemptListener, jobTokenSecretManager,
+ fsTokens, clock, completedTasksFromPreviousRun, metrics, committer,
+ newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@@ -471,6 +541,22 @@ public class MRAppMaster extends Composi
return appAttemptID.getApplicationId();
}
+ public ApplicationAttemptId getAttemptID() {
+ return appAttemptID;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public OutputCommitter getCommitter() {
+ return committer;
+ }
+
+ public boolean isNewApiCommitter() {
+ return newApiCommitter;
+ }
+
public int getStartCount() {
return appAttemptID.getAttemptId();
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Oct 24 08:44:54 2011
@@ -39,15 +39,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -64,7 +61,6 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@@ -98,14 +94,11 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -126,15 +119,13 @@ public class JobImpl implements org.apac
// Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
-
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
//final fields
private final ApplicationAttemptId applicationAttemptId;
private final Clock clock;
private final JobACLsManager aclsManager;
private final String username;
+ private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs;
private final Set<TaskId> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos;
@@ -142,6 +133,7 @@ public class JobImpl implements org.apac
private final Lock writeLock;
private final JobId jobId;
private final String jobName;
+ private final boolean newApiCommitter;
private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private final Object tasksSyncHandle = new Object();
@@ -167,7 +159,6 @@ public class JobImpl implements org.apac
private Path remoteJobSubmitDir;
public Path remoteJobConfFile;
private JobContext jobContext;
- private OutputCommitter committer;
private int allowedMapFailuresPercent = 0;
private int allowedReduceFailuresPercent = 0;
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
@@ -367,14 +358,16 @@ public class JobImpl implements org.apac
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
- public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
- EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
+ public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
+ Configuration conf, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
- Credentials fsTokenCredentials, Clock clock,
+ Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
- String userName, long appSubmitTime, List<AMInfo> amInfos) {
+ OutputCommitter committer, boolean newApiCommitter, String userName,
+ long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId;
- this.jobId = recordFactory.newRecordInstance(JobId.class);
+ this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = conf;
this.metrics = metrics;
@@ -383,15 +376,9 @@ public class JobImpl implements org.apac
this.amInfos = amInfos;
this.userName = userName;
this.appSubmitTime = appSubmitTime;
- ApplicationId applicationId = applicationAttemptId.getApplicationId();
- jobId.setAppId(applicationId);
- jobId.setId(applicationId.getId());
- oldJobId = TypeConverter.fromYarn(jobId);
- LOG.info("Job created" +
- " appId=" + applicationId +
- " jobId=" + jobId +
- " oldJobId=" + oldJobId);
-
+ this.oldJobId = TypeConverter.fromYarn(jobId);
+ this.newApiCommitter = newApiCommitter;
+
this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -400,6 +387,7 @@ public class JobImpl implements org.apac
this.fsTokens = fsTokenCredentials;
this.jobTokenSecretManager = jobTokenSecretManager;
+ this.committer = committer;
this.aclsManager = new JobACLsManager(conf);
this.username = System.getProperty("user.name");
@@ -854,47 +842,13 @@ public class JobImpl implements org.apac
checkTaskLimits();
-
- boolean newApiCommitter = false;
- if ((job.numReduceTasks > 0 &&
- job.conf.getBoolean("mapred.reducer.new-api", false)) ||
- (job.numReduceTasks == 0 &&
- job.conf.getBoolean("mapred.mapper.new-api", false))) {
- newApiCommitter = true;
- LOG.info("Using mapred newApiCommitter.");
- }
-
- LOG.info("OutputCommitter set in config " + job.conf.get(
- "mapred.output.committer.class"));
-
- if (newApiCommitter) {
+ if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
- = RecordFactoryProvider.getRecordFactory(null)
- .newRecordInstance(
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
- attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
- .newRecordInstance(TaskId.class));
- attemptID.getTaskId().setJobId(job.jobId);
- attemptID.getTaskId().setTaskType(TaskType.MAP);
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf,
- TypeConverter.fromYarn(attemptID));
- try {
- OutputFormat outputFormat = ReflectionUtils.newInstance(
- taskContext.getOutputFormatClass(), job.conf);
- job.committer = outputFormat.getOutputCommitter(taskContext);
- } catch(Exception e) {
- throw new IOException("Failed to assign outputcommitter", e);
- }
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(job.conf), job.oldJobId);
- job.committer = ReflectionUtils.newInstance(
- job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class,
- org.apache.hadoop.mapred.OutputCommitter.class), job.conf);
}
- LOG.info("OutputCommitter is " + job.committer.getClass().getName());
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon Oct 24 08:44:54 2011
@@ -32,17 +32,23 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -53,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
@@ -84,9 +91,6 @@ import org.apache.hadoop.yarn.service.Se
//TODO:
//task cleanup for all non completed tasks
-//change job output committer to have
-// - atomic job output promotion
-// - recover output of completed tasks
public class RecoveryService extends CompositeService implements Recovery {
@@ -95,6 +99,7 @@ public class RecoveryService extends Com
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationAttemptId applicationAttemptId;
+ private final OutputCommitter committer;
private final Dispatcher dispatcher;
private final ControlledClock clock;
@@ -108,9 +113,10 @@ public class RecoveryService extends Com
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationAttemptId applicationAttemptId,
- Clock clock) {
+ Clock clock, OutputCommitter committer) {
super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId;
+ this.committer = committer;
this.dispatcher = new RecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@@ -122,17 +128,17 @@ public class RecoveryService extends Com
// parse the history file
try {
parse();
- if (completedTasks.size() > 0) {
- recoveryMode = true;
- LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " +
- "TO RECOVER " + completedTasks.size());
- LOG.info("Job launch time " + jobInfo.getLaunchTime());
- clock.setTime(jobInfo.getLaunchTime());
- }
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.warn(e);
LOG.warn("Could not parse the old history file. Aborting recovery. "
- + "Starting afresh.");
+ + "Starting afresh.", e);
+ }
+ if (completedTasks.size() > 0) {
+ recoveryMode = true;
+ LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+ + "TO RECOVER " + completedTasks.size());
+ LOG.info("Job launch time " + jobInfo.getLaunchTime());
+ clock.setTime(jobInfo.getLaunchTime());
}
}
@@ -315,6 +321,20 @@ public class RecoveryService extends Com
TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
switch (state) {
case SUCCEEDED:
+ //recover the task output
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
+ attInfo.getAttemptId());
+ try {
+ committer.recoverTask(taskContext);
+ } catch (IOException e) {
+ actualHandler.handle(new JobDiagnosticsUpdateEvent(
+ aId.getTaskId().getJobId(), "Error in recovering task output " +
+ e.getMessage()));
+ actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
+ JobEventType.INTERNAL_ERROR));
+ }
+ LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+
// send the done event
LOG.info("Sending done event to " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
@@ -334,6 +354,16 @@ public class RecoveryService extends Com
return;
}
+ else if (event.getType() ==
+ ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
+ TaskAttemptId aId = ((ContainerLauncherEvent) event)
+ .getTaskAttemptID();
+ actualHandler.handle(
+ new TaskAttemptEvent(aId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ return;
+ }
+
// delegate to the actual handler
actualHandler.handle(event);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Mon Oct 24 08:44:54 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -264,9 +265,11 @@ public class MRApp extends MRAppMaster {
} catch (IOException e) {
throw new YarnException(e);
}
- Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(),
- getTaskAttemptListener(), getContext().getClock(),
- currentUser.getUserName());
+ Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
+ getDispatcher().getEventHandler(),
+ getTaskAttemptListener(), getContext().getClock(),
+ getCommitter(), isNewApiCommitter(),
+ currentUser.getUserName());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@@ -413,13 +416,15 @@ public class MRApp extends MRAppMaster {
return localStateMachine;
}
- public TestJob(Configuration conf, ApplicationId applicationId,
- EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
- Clock clock, String user) {
- super(getApplicationAttemptId(applicationId, getStartCount()), conf,
- eventHandler, taskAttemptListener, new JobTokenSecretManager(),
- new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics,
- user, System.currentTimeMillis(), getAllAMInfos());
+ public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+ Configuration conf, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener, Clock clock,
+ OutputCommitter committer, boolean newApiCommitter, String user) {
+ super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
+ conf, eventHandler, taskAttemptListener,
+ new JobTokenSecretManager(), new Credentials(), clock,
+ getCompletedTaskFromPreviousRun(), metrics, committer,
+ newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos());
// This "this leak" is okay because the retained pointer is in an
// instance variable.
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon Oct 24 08:44:54 2011
@@ -342,10 +342,10 @@ public class TestRMContainerAllocator {
public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
int numMaps, int numReduces) {
- super(appAttemptID, conf, null, null, null, null, null, null, null, null,
- System.currentTimeMillis(), null);
- this.jobId = MRBuilderUtils
- .newJobId(appAttemptID.getApplicationId(), 0);
+ super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0),
+ appAttemptID, conf, null, null, null, null, null, null, null, null,
+ true, null, System.currentTimeMillis(), null);
+ this.jobId = getID();
this.numMaps = numMaps;
this.numReduces = numReduces;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Mon Oct 24 08:44:54 2011
@@ -18,6 +18,9 @@
package org.apache.hadoop.mapreduce.v2.app;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.Iterator;
import junit.framework.Assert;
@@ -25,10 +28,21 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@@ -37,20 +51,34 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
+ private static Path outputDir = new Path(new File("target",
+ TestRecovery.class.getName()).getAbsolutePath() +
+ Path.SEPARATOR + "out");
+ private static String partFile = "part-r-00000";
+ private Text key1 = new Text("key1");
+ private Text key2 = new Text("key2");
+ private Text val1 = new Text("val1");
+ private Text val2 = new Text("val2");
+
@Test
public void testCrashed() throws Exception {
+
int runCount = 0;
long am1StartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
long jobStartTime = job.getReport().getStartTime();
@@ -135,6 +163,9 @@ public class TestRecovery {
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
@@ -201,7 +232,165 @@ public class TestRecovery {
// TODO Add verification of additional data from jobHistory - whatever was
// available in the failed attempt should be available here
}
+
+ @Test
+ public void testOutputRecovery() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+ true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task reduceTask1 = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+ .next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+ //send the done signal to the map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(reduceTask1, TaskState.RUNNING);
+ TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+
+ // write output corresponding to reduce1
+ writeOutput(reduce1Attempt1, conf);
+
+ //send the done signal to the 1st reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for first reduce task to complete
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ //stop the app before the job completes.
+ app.stop();
+
+ //rerun
+ //in rerun the map will be recovered from previous run
+ app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ reduceTask1 = it.next();
+ Task reduceTask2 = it.next();
+
+ // map will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // first reduce will be recovered, no need to send done
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(reduceTask2, TaskState.RUNNING);
+
+ TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+ .iterator().next();
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+
+ //send the done signal to the 2nd reduce task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce2Attempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait to get it completed
+ app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ validateOutput();
+ }
+
+ private void writeOutput(TaskAttempt attempt, Configuration conf)
+ throws Exception {
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attempt.getID()));
+
+ TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter = theOutputFormat
+ .getRecordWriter(tContext);
+
+ NullWritable nullWritable = NullWritable.get();
+ try {
+ theRecordWriter.write(key1, val1);
+ theRecordWriter.write(null, nullWritable);
+ theRecordWriter.write(null, val1);
+ theRecordWriter.write(nullWritable, val2);
+ theRecordWriter.write(key2, nullWritable);
+ theRecordWriter.write(key1, null);
+ theRecordWriter.write(null, null);
+ theRecordWriter.write(key2, val2);
+ } finally {
+ theRecordWriter.close(tContext);
+ }
+
+ OutputFormat outputFormat = ReflectionUtils.newInstance(
+ tContext.getOutputFormatClass(), conf);
+ OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+ committer.commitTask(tContext);
+ }
+
+ private void validateOutput() throws IOException {
+ File expectedFile = new File(new Path(outputDir, partFile).toString());
+ StringBuffer expectedOutput = new StringBuffer();
+ expectedOutput.append(key1).append('\t').append(val1).append("\n");
+ expectedOutput.append(val1).append("\n");
+ expectedOutput.append(val2).append("\n");
+ expectedOutput.append(key2).append("\n");
+ expectedOutput.append(key1).append("\n");
+ expectedOutput.append(key2).append('\t').append(val2).append("\n");
+ String output = slurp(expectedFile);
+ Assert.assertEquals(output, expectedOutput.toString());
+ }
+
+ public static String slurp(File f) throws IOException {
+ int len = (int) f.length();
+ byte[] buf = new byte[len];
+ FileInputStream in = new FileInputStream(f);
+ String contents = null;
+ try {
+ in.read(buf, 0, len);
+ contents = new String(buf, "UTF-8");
+ } finally {
+ in.close();
+ }
+ return contents;
+ }
+
+
class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Oct 24 08:44:54 2011
@@ -449,6 +449,8 @@ public interface MRJobConfig {
public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
+ public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
+
// This should be the directory where splits file gets localized on the node
// running ApplicationMaster.
public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Mon Oct 24 08:44:54 2011
@@ -25,7 +25,6 @@ import org.apache.avro.Schema;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
@@ -72,6 +71,7 @@ class EventWriter {
void flush() throws IOException {
encoder.flush();
out.flush();
+ out.hflush();
}
void close() throws IOException {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java?rev=1188044&r1=1188043&r2=1188044&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java Mon Oct 24 08:44:54 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.lib.output;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobContext;
@@ -56,6 +58,17 @@ public class NullOutputFormat<K, V> exte
}
public void setupJob(JobContext jobContext) { }
public void setupTask(TaskAttemptContext taskContext) { }
+
+ @Override
+ public boolean isRecoverySupported() {
+ return true;
+ }
+
+ @Override
+ public void recoverTask(TaskAttemptContext taskContext)
+ throws IOException {
+ // Nothing to do for recovering the task.
+ }
};
}
}