You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2013/01/04 21:38:37 UTC
svn commit: r1429115 [1/2] - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Author: bobby
Date: Fri Jan 4 20:38:36 2013
New Revision: 1429115
URL: http://svn.apache.org/viewvc?rev=1429115&view=rev
Log:
svn merge -c 1429114 FIXES: MAPREDUCE-4819. AM can rerun job after reporting final job status to the client (bobby and Bikas Saha via bobby)
Added:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
- copied unchanged from r1429114, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java
- copied unchanged from r1429114, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Jan 4 20:38:36 2013
@@ -504,6 +504,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4894. Renewal / cancellation of JobHistory tokens (Siddharth
Seth via tgraves)
+ MAPREDUCE-4819. AM can rerun job after reporting final job status to the
+ client (bobby and Bikas Saha via bobby)
+
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Fri Jan 4 20:38:36 2013
@@ -116,12 +116,15 @@ public class JobHistoryEventHandler exte
*/
@Override
public void init(Configuration conf) {
-
+ String jobId =
+ TypeConverter.fromYarn(context.getApplicationID()).toString();
+
String stagingDirStr = null;
String doneDirStr = null;
String userDoneDirStr = null;
try {
- stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+ stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf,
+ jobId);
doneDirStr =
JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
userDoneDirStr =
@@ -881,7 +884,7 @@ public class JobHistoryEventHandler exte
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
// check if path exists, in case of retries it may not exist
if (stagingDirFS.exists(fromPath)) {
- LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString());
+ LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
// TODO temporarily removing the existing dst
if (doneDirFS.exists(toPath)) {
doneDirFS.delete(toPath, true);
@@ -892,11 +895,9 @@ public class JobHistoryEventHandler exte
if (copied)
LOG.info("Copied to done location: " + toPath);
else
- LOG.info("copy failed");
+ LOG.info("copy failed");
doneDirFS.setPermission(toPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
-
- stagingDirFS.delete(fromPath, false);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Jan 4 20:38:36 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -66,6 +67,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -91,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
@@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
@@ -179,9 +183,13 @@ public class MRAppMaster extends Composi
private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init
- private UserGroupInformation currentUser; // Will be setup during init
+ protected UserGroupInformation currentUser; // Will be setup during init
private volatile boolean isLastAMRetry = false;
+ //Something happened and we should shut down right after we start up.
+ boolean errorHappenedShutDown = false;
+ private String shutDownMessage = null;
+ JobStateInternal forcedState = null;
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
@@ -242,94 +250,175 @@ public class MRAppMaster extends Composi
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
-
- committer = createOutputCommitter(conf);
- boolean recoveryEnabled = conf.getBoolean(
- MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
- boolean recoverySupportedByCommitter = committer.isRecoverySupported();
- if (recoveryEnabled && recoverySupportedByCommitter
- && appAttemptID.getAttemptId() > 1) {
- LOG.info("Recovery is enabled. "
- + "Will try to recover from previous life on best effort basis.");
- recoveryServ = createRecoveryService(context);
- addIfService(recoveryServ);
- dispatcher = recoveryServ.getDispatcher();
- clock = recoveryServ.getClock();
- inRecovery = true;
- } else {
- LOG.info("Not starting RecoveryService: recoveryEnabled: "
- + recoveryEnabled + " recoverySupportedByCommitter: "
- + recoverySupportedByCommitter + " ApplicationAttemptID: "
- + appAttemptID.getAttemptId());
+
+ boolean copyHistory = false;
+ try {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ FileSystem fs = getFileSystem(conf);
+ boolean stagingExists = fs.exists(stagingDir);
+ Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+ boolean commitStarted = fs.exists(startCommitFile);
+ Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
+ boolean commitSuccess = fs.exists(endCommitSuccessFile);
+ Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
+ boolean commitFailure = fs.exists(endCommitFailureFile);
+ if(!stagingExists) {
+ isLastAMRetry = true;
+ errorHappenedShutDown = true;
+ forcedState = JobStateInternal.ERROR;
+ shutDownMessage = "Staging dir does not exist " + stagingDir;
+ LOG.fatal(shutDownMessage);
+ } else if (commitStarted) {
+ //A commit was started so this is the last time, we just need to know
+ // what result we will use to notify, and how we will unregister
+ errorHappenedShutDown = true;
+ isLastAMRetry = true;
+ copyHistory = true;
+ if (commitSuccess) {
+ shutDownMessage = "We crashed after successfully committing. Recovering.";
+ forcedState = JobStateInternal.SUCCEEDED;
+ } else if (commitFailure) {
+ shutDownMessage = "We crashed after a commit failure.";
+ forcedState = JobStateInternal.FAILED;
+ } else {
+ //The commit is still pending, commit error
+ shutDownMessage = "We crashed durring a commit";
+ forcedState = JobStateInternal.ERROR;
+ }
+ }
+ } catch (IOException e) {
+ throw new YarnException("Error while initializing", e);
+ }
+
+ if (errorHappenedShutDown) {
dispatcher = createDispatcher();
addIfService(dispatcher);
- }
+
+ EventHandler<JobHistoryEvent> historyService = null;
+ if (copyHistory) {
+ historyService =
+ createJobHistoryHandler(context);
+ dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ historyService);
+ }
+ NoopEventHandler eater = new NoopEventHandler();
+ //We do not have a JobEventDispatcher in this path
+ dispatcher.register(JobEventType.class, eater);
+
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ containerAllocator = createContainerAllocator(null, context);
+ addIfService(containerAllocator);
+ dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+ if (copyHistory) {
+ // Add the staging directory cleaner before the history server but after
+ // the container allocator so the staging directory is cleaned after
+ // the history has been flushed but before unregistering with the RM.
+ addService(createStagingDirCleaningService());
+
+ // Add the JobHistoryEventHandler last so that it is properly stopped first.
+ // This will guarantee that all history-events are flushed before AM goes
+ // ahead with shutdown.
+ // Note: Even though JobHistoryEventHandler is started last, if any
+ // component creates a JobHistoryEvent in the meanwhile, it will be just be
+ // queued inside the JobHistoryEventHandler
+ addIfService(historyService);
+
- //service to handle requests from JobClient
- clientService = createClientService(context);
- addIfService(clientService);
-
- containerAllocator = createContainerAllocator(clientService, context);
-
- //service to handle requests to TaskUmbilicalProtocol
- taskAttemptListener = createTaskAttemptListener(context);
- addIfService(taskAttemptListener);
-
- //service to handle the output committer
- committerEventHandler = createCommitterEventHandler(context, committer);
- addIfService(committerEventHandler);
+ JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
+ dispatcher.getEventHandler());
+ addIfService(cpHist);
+ }
+ } else {
+ committer = createOutputCommitter(conf);
+ boolean recoveryEnabled = conf.getBoolean(
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+ if (recoveryEnabled && recoverySupportedByCommitter
+ && appAttemptID.getAttemptId() > 1) {
+ LOG.info("Recovery is enabled. "
+ + "Will try to recover from previous life on best effort basis.");
+ recoveryServ = createRecoveryService(context);
+ addIfService(recoveryServ);
+ dispatcher = recoveryServ.getDispatcher();
+ clock = recoveryServ.getClock();
+ inRecovery = true;
+ } else {
+ LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ + recoveryEnabled + " recoverySupportedByCommitter: "
+ + recoverySupportedByCommitter + " ApplicationAttemptID: "
+ + appAttemptID.getAttemptId());
+ dispatcher = createDispatcher();
+ addIfService(dispatcher);
+ }
+
+ //service to handle requests from JobClient
+ clientService = createClientService(context);
+ addIfService(clientService);
+
+ containerAllocator = createContainerAllocator(clientService, context);
+
+ //service to handle the output committer
+ committerEventHandler = createCommitterEventHandler(context, committer);
+ addIfService(committerEventHandler);
+
+ //service to handle requests to TaskUmbilicalProtocol
+ taskAttemptListener = createTaskAttemptListener(context);
+ addIfService(taskAttemptListener);
- //service to log job history events
- EventHandler<JobHistoryEvent> historyService =
+ //service to log job history events
+ EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context);
- dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
- historyService);
+ dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ historyService);
- this.jobEventDispatcher = new JobEventDispatcher();
+ this.jobEventDispatcher = new JobEventDispatcher();
- //register the event dispatchers
- dispatcher.register(JobEventType.class, jobEventDispatcher);
- dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
- dispatcher.register(TaskAttemptEventType.class,
- new TaskAttemptEventDispatcher());
- dispatcher.register(CommitterEventType.class, committerEventHandler);
-
- if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
- || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
- //optional service to speculate on task attempts' progress
- speculator = createSpeculator(conf, context);
- addIfService(speculator);
- }
-
- speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
- dispatcher.register(Speculator.EventType.class,
- speculatorEventDispatcher);
-
- // service to allocate containers from RM (if non-uber) or to fake it (uber)
- addIfService(containerAllocator);
- dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
-
- // corresponding service to launch allocated containers via NodeManager
- containerLauncher = createContainerLauncher(context);
- addIfService(containerLauncher);
- dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
-
- // Add the staging directory cleaner before the history server but after
- // the container allocator so the staging directory is cleaned after
- // the history has been flushed but before unregistering with the RM.
- addService(createStagingDirCleaningService());
-
- // Add the JobHistoryEventHandler last so that it is properly stopped first.
- // This will guarantee that all history-events are flushed before AM goes
- // ahead with shutdown.
- // Note: Even though JobHistoryEventHandler is started last, if any
- // component creates a JobHistoryEvent in the meanwhile, it will be just be
- // queued inside the JobHistoryEventHandler
- addIfService(historyService);
+ //register the event dispatchers
+ dispatcher.register(JobEventType.class, jobEventDispatcher);
+ dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+ dispatcher.register(TaskAttemptEventType.class,
+ new TaskAttemptEventDispatcher());
+ dispatcher.register(CommitterEventType.class, committerEventHandler);
+
+ if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+ || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+ //optional service to speculate on task attempts' progress
+ speculator = createSpeculator(conf, context);
+ addIfService(speculator);
+ }
+ speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
+ dispatcher.register(Speculator.EventType.class,
+ speculatorEventDispatcher);
+
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ addIfService(containerAllocator);
+ dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+ // corresponding service to launch allocated containers via NodeManager
+ containerLauncher = createContainerLauncher(context);
+ addIfService(containerLauncher);
+ dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+
+ // Add the staging directory cleaner before the history server but after
+ // the container allocator so the staging directory is cleaned after
+ // the history has been flushed but before unregistering with the RM.
+ addService(createStagingDirCleaningService());
+
+ // Add the JobHistoryEventHandler last so that it is properly stopped first.
+ // This will guarantee that all history-events are flushed before AM goes
+ // ahead with shutdown.
+ // Note: Even though JobHistoryEventHandler is started last, if any
+ // component creates a JobHistoryEvent in the meanwhile, it will be just be
+ // queued inside the JobHistoryEventHandler
+ addIfService(historyService);
+ }
+
super.init(conf);
} // end of init()
-
+
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
@@ -489,15 +578,20 @@ public class MRAppMaster extends Composi
appContext.getClock(), getCommitter());
}
- /** Create and initialize (but don't start) a single job. */
- protected Job createJob(Configuration conf) {
+ /** Create and initialize (but don't start) a single job.
+ * @param forcedState a state to force the job into or null for normal operation.
+ * @param diagnostic a diagnostic message to include with the job.
+ */
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
// create single job
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, newApiCommitter,
- currentUser.getUserName(), appSubmitTime, amInfos, context);
+ currentUser.getUserName(), appSubmitTime, amInfos, context,
+ forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@@ -874,7 +968,7 @@ public class MRAppMaster extends Composi
amInfos.add(amInfo);
// /////////////////// Create the job itself.
- job = createJob(getConfig());
+ job = createJob(getConfig(), forcedState, shutDownMessage);
// End of creating the job.
@@ -891,31 +985,33 @@ public class MRAppMaster extends Composi
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
- // create a job event for job intialization
- JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
- // Send init to the job (this does NOT trigger job execution)
- // This is a synchronous call, not an event through dispatcher. We want
- // job-init to be done completely here.
- jobEventDispatcher.handle(initJobEvent);
-
-
- // JobImpl's InitTransition is done (call above is synchronous), so the
- // "uber-decision" (MR-1220) has been made. Query job and switch to
- // ubermode if appropriate (by registering different container-allocator
- // and container-launcher services/event-handlers).
-
- if (job.isUber()) {
- speculatorEventDispatcher.disableSpeculation();
- LOG.info("MRAppMaster uberizing job " + job.getID()
- + " in local container (\"uber-AM\") on node "
- + nmHost + ":" + nmPort + ".");
- } else {
- // send init to speculator only for non-uber jobs.
- // This won't yet start as dispatcher isn't started yet.
- dispatcher.getEventHandler().handle(
- new SpeculatorEvent(job.getID(), clock.getTime()));
- LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
- + "job " + job.getID() + ".");
+ if (!errorHappenedShutDown) {
+ // create a job event for job intialization
+ JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+ // Send init to the job (this does NOT trigger job execution)
+ // This is a synchronous call, not an event through dispatcher. We want
+ // job-init to be done completely here.
+ jobEventDispatcher.handle(initJobEvent);
+
+
+ // JobImpl's InitTransition is done (call above is synchronous), so the
+ // "uber-decision" (MR-1220) has been made. Query job and switch to
+ // ubermode if appropriate (by registering different container-allocator
+ // and container-launcher services/event-handlers).
+
+ if (job.isUber()) {
+ speculatorEventDispatcher.disableSpeculation();
+ LOG.info("MRAppMaster uberizing job " + job.getID()
+ + " in local container (\"uber-AM\") on node "
+ + nmHost + ":" + nmPort + ".");
+ } else {
+ // send init to speculator only for non-uber jobs.
+ // This won't yet start as dispatcher isn't started yet.
+ dispatcher.getEventHandler().handle(
+ new SpeculatorEvent(job.getID(), clock.getTime()));
+ LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ + "job " + job.getID() + ".");
+ }
}
//start all the components
@@ -1062,6 +1158,17 @@ public class MRAppMaster extends Composi
}
+ /**
+ * Eats events that are not needed in some error cases.
+ */
+ private static class NoopEventHandler implements EventHandler<Event> {
+
+ @Override
+ public void handle(Event event) {
+ //Empty
+ }
+ }
+
private static void validateInputParam(String value, String param)
throws IOException {
if (value == null) {
@@ -1158,6 +1265,9 @@ public class MRAppMaster extends Composi
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
+ if(appMaster.errorHappenedShutDown) {
+ throw new IOException("Was asked to shut down.");
+ }
return null;
}
});
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Fri Jan 4 20:38:36 2013
@@ -29,8 +29,13 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
@@ -40,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -64,6 +71,11 @@ public class CommitterEventHandler exten
private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs;
private long commitWindowMs;
+ private FileSystem fs;
+ private Path startCommitFile;
+ private Path endCommitSuccessFile;
+ private Path endCommitFailureFile;
+
public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler) {
@@ -82,10 +94,21 @@ public class CommitterEventHandler exten
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
+ try {
+ fs = FileSystem.get(conf);
+ JobID id = TypeConverter.fromYarn(context.getApplicationID());
+ JobId jobId = TypeConverter.toYarn(id);
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+ endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
+ endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
}
@Override
- public void start() {
+ public void start() {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("CommitterEvent Processor #%d")
.build();
@@ -199,7 +222,7 @@ public class CommitterEventHandler exten
+ event.toString());
}
}
-
+
@SuppressWarnings("unchecked")
protected void handleJobSetup(CommitterJobSetupEvent event) {
try {
@@ -213,19 +236,30 @@ public class CommitterEventHandler exten
}
}
+ private void touchz(Path p) throws IOException {
+ fs.create(p, false).close();
+ }
+
@SuppressWarnings("unchecked")
protected void handleJobCommit(CommitterJobCommitEvent event) {
try {
+ touchz(startCommitFile);
jobCommitStarted();
waitForValidCommitWindow();
committer.commitJob(event.getJobContext());
+ touchz(endCommitSuccessFile);
context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID()));
} catch (Exception e) {
- LOG.error("Could not commit job", e);
- context.getEventHandler().handle(
- new JobCommitFailedEvent(event.getJobID(),
- StringUtils.stringifyException(e)));
+ try {
+ touchz(endCommitFailureFile);
+ } catch (Exception e2) {
+ LOG.error("could not create failure file.", e2);
+ }
+ LOG.error("Could not commit job", e);
+ context.getEventHandler().handle(
+ new JobCommitFailedEvent(event.getJobID(),
+ StringUtils.stringifyException(e)));
} finally {
jobCommitEnded();
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Jan 4 20:38:36 2013
@@ -540,6 +540,8 @@ public class JobImpl implements org.apac
private Credentials fsTokens;
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
+
+ private JobStateInternal forcedState = null;
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
@@ -548,7 +550,8 @@ public class JobImpl implements org.apac
Credentials fsTokenCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
boolean newApiCommitter, String userName,
- long appSubmitTime, List<AMInfo> amInfos, AppContext appContext) {
+ long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
+ JobStateInternal forcedState, String forcedDiagnostic) {
this.applicationAttemptId = applicationAttemptId;
this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
@@ -579,6 +582,10 @@ public class JobImpl implements org.apac
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
+ this.forcedState = forcedState;
+ if(forcedDiagnostic != null) {
+ this.diagnostics.add(forcedDiagnostic);
+ }
}
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -818,7 +825,7 @@ public class JobImpl implements org.apac
public JobState getState() {
readLock.lock();
try {
- return getExternalState(getStateMachine().getCurrentState());
+ return getExternalState(getInternalState());
} finally {
readLock.unlock();
}
@@ -868,6 +875,9 @@ public class JobImpl implements org.apac
public JobStateInternal getInternalState() {
readLock.lock();
try {
+ if(forcedState != null) {
+ return forcedState;
+ }
return getStateMachine().getCurrentState();
} finally {
readLock.unlock();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Fri Jan 4 20:38:36 2013
@@ -205,18 +205,18 @@ public class RecoveryService extends Com
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
- String jobName =
+ String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
- JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+ JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
- jobName, (applicationAttemptId.getAttemptId() - 1)));
+ jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Jan 4 20:38:36 2013
@@ -141,14 +141,19 @@ public abstract class RMCommunicator ext
protected void register() {
//Register
- InetSocketAddress serviceAddr = clientService.getBindAddress();
+ InetSocketAddress serviceAddr = null;
+ if (clientService != null ) {
+ serviceAddr = clientService.getBindAddress();
+ }
try {
RegisterApplicationMasterRequest request =
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
- request.setHost(serviceAddr.getHostName());
- request.setRpcPort(serviceAddr.getPort());
- request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+ if (serviceAddr != null) {
+ request.setHost(serviceAddr.getHostName());
+ request.setRpcPort(serviceAddr.getPort());
+ request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+ }
RegisterApplicationMasterResponse response =
scheduler.registerApplicationMaster(request);
minContainerCapability = response.getMinimumResourceCapability();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Fri Jan 4 20:38:36 2013
@@ -18,14 +18,9 @@
package org.apache.hadoop.mapreduce.jobhistory;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static junit.framework.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
import java.io.File;
import java.io.IOException;
@@ -51,7 +46,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
public class TestJobHistoryEventHandler {
@@ -260,13 +254,15 @@ public class TestJobHistoryEventHandler
}
}
- private AppContext mockAppContext(JobId jobId) {
+ private AppContext mockAppContext(ApplicationId appId) {
+ JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
AppContext mockContext = mock(AppContext.class);
Job mockJob = mock(Job.class);
when(mockJob.getTotalMaps()).thenReturn(10);
when(mockJob.getTotalReduces()).thenReturn(10);
when(mockJob.getName()).thenReturn("mockjob");
when(mockContext.getJob(jobId)).thenReturn(mockJob);
+ when(mockContext.getApplicationID()).thenReturn(appId);
return mockContext;
}
@@ -279,7 +275,7 @@ public class TestJobHistoryEventHandler
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
- AppContext mockAppContext = mockAppContext(jobId);
+ AppContext mockAppContext = mockAppContext(appId);
}
private JobHistoryEvent getEventToEnqueue(JobId jobId) {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Jan 4 20:38:36 2013
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.JobContext;
@@ -208,6 +209,16 @@ public class MRApp extends MRAppMaster {
@Override
public void init(Configuration conf) {
+ try {
+ //Create the staging directory if it does not exist
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ FileSystem fs = getFileSystem(conf);
+ fs.mkdirs(stagingDir);
+ } catch (Exception e) {
+ throw new YarnException("Error creating staging dir", e);
+ }
+
super.init(conf);
if (this.clusterInfo != null) {
getContext().getClusterInfo().setMinContainerCapability(
@@ -388,7 +399,8 @@ public class MRApp extends MRAppMaster {
}
@Override
- protected Job createJob(Configuration conf) {
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
@@ -398,7 +410,8 @@ public class MRApp extends MRAppMaster {
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- isNewApiCommitter(), currentUser.getUserName(), getContext());
+ isNewApiCommitter(), currentUser.getUserName(), getContext(),
+ forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@@ -631,13 +644,14 @@ public class MRApp extends MRAppMaster {
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
- boolean newApiCommitter, String user, AppContext appContext) {
+ boolean newApiCommitter, String user, AppContext appContext,
+ JobStateInternal forcedState, String diagnostic) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
- appContext);
+ appContext, forcedState, diagnostic);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Fri Jan 4 20:38:36 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
@@ -370,8 +371,9 @@ public class TestMRApp {
}
@Override
- protected Job createJob(Configuration conf) {
- spiedJob = spy((JobImpl) super.createJob(conf));
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
+ spiedJob = spy((JobImpl) super.createJob(conf, forcedState, diagnostic));
((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
return spiedJob;
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Fri Jan 4 20:38:36 2013
@@ -17,28 +17,63 @@
*/
package org.apache.hadoop.mapreduce.v2.app;
-import java.io.IOException;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
-import junit.framework.Assert;
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestMRAppMaster {
+ private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
+ static String stagingDir = "staging/";
+
+ @BeforeClass
+ public static void setup() {
+ //Do not error out if metrics are inited multiple times
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ File dir = new File(stagingDir);
+ stagingDir = dir.getAbsolutePath();
+ }
+
+ @Before
+ public void cleanup() throws IOException {
+ File dir = new File(stagingDir);
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdirs();
+ }
+
@Test
public void testMRAppMasterForDifferentUser() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
String containerIdStr = "container_1317529182569_0004_000001_1";
- String stagingDir = "/tmp/staging";
+
String userName = "TestAppMasterUser";
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
@@ -49,34 +84,208 @@ public class TestMRAppMaster {
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
- Assert.assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+ assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+ ".staging", appMaster.stagingDirPath.toString());
}
+
+ @Test
+ public void testMRAppMasterMidLock() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+ Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+ FileSystem fs = FileSystem.get(conf);
+ //Create the file, but no end file so we should unregister with an error.
+ fs.create(start).close();
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMaster appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false);
+ boolean caught = false;
+ try {
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ } catch (IOException e) {
+ //The IO Exception is expected
+ LOG.info("Caught expected Exception", e);
+ caught = true;
+ }
+ assertTrue(caught);
+ assertTrue(appMaster.errorHappenedShutDown);
+ assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+ appMaster.stop();
+ }
+
+ @Test
+ public void testMRAppMasterSuccessLock() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+ Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+ Path end = MRApps.getEndJobCommitSuccessFile(conf, userName, jobId);
+ FileSystem fs = FileSystem.get(conf);
+ fs.create(start).close();
+ fs.create(end).close();
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMaster appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false);
+ boolean caught = false;
+ try {
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ } catch (IOException e) {
+ //The IO Exception is expected
+ LOG.info("Caught expected Exception", e);
+ caught = true;
+ }
+ assertTrue(caught);
+ assertTrue(appMaster.errorHappenedShutDown);
+ assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
+ appMaster.stop();
+ }
+
+ @Test
+ public void testMRAppMasterFailLock() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+ Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+ Path end = MRApps.getEndJobCommitFailureFile(conf, userName, jobId);
+ FileSystem fs = FileSystem.get(conf);
+ fs.create(start).close();
+ fs.create(end).close();
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMaster appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false);
+ boolean caught = false;
+ try {
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ } catch (IOException e) {
+ //The IO Exception is expected
+ LOG.info("Caught expected Exception", e);
+ caught = true;
+ }
+ assertTrue(caught);
+ assertTrue(appMaster.errorHappenedShutDown);
+ assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
+ appMaster.stop();
+ }
+
+ @Test
+ public void testMRAppMasterMissingStaging() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+
+ //Delete the staging directory
+ File dir = new File(stagingDir);
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMaster appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis(), false);
+ boolean caught = false;
+ try {
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ } catch (IOException e) {
+ //The IO Exception is expected
+ LOG.info("Caught expected Exception", e);
+ caught = true;
+ }
+ assertTrue(caught);
+ assertTrue(appMaster.errorHappenedShutDown);
+ //Copying the history file is disabled, but it is not really visible from
+ //here
+ assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+ appMaster.stop();
+ }
}
class MRAppMasterTest extends MRAppMaster {
Path stagingDirPath;
private Configuration conf;
+ private boolean overrideInitAndStart;
+ ContainerAllocator mockContainerAllocator;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
long submitTime) {
+ this(applicationAttemptId, containerId, host, port, httpPort, submitTime,
+ true);
+ }
+ public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String host, int port, int httpPort,
+ long submitTime, boolean overrideInitAndStart) {
super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
+ this.overrideInitAndStart = overrideInitAndStart;
+ mockContainerAllocator = mock(ContainerAllocator.class);
}
@Override
public void init(Configuration conf) {
- this.conf = conf;
+ if (overrideInitAndStart) {
+ this.conf = conf;
+ } else {
+ super.init(conf);
+ }
+ }
+
+ @Override
+ protected void downloadTokensAndSetupUGI(Configuration conf) {
+ try {
+ this.currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ final ClientService clientService, final AppContext context) {
+ return mockContainerAllocator;
}
@Override
public void start() {
- try {
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
- stagingDirPath = MRApps.getStagingAreaDir(conf, user);
- } catch (Exception e) {
- Assert.fail(e.getMessage());
+ if (overrideInitAndStart) {
+ try {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ stagingDirPath = MRApps.getStagingAreaDir(conf, user);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ } else {
+ super.start();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Jan 4 20:38:36 2013
@@ -38,11 +38,13 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -72,6 +74,10 @@ import org.junit.Test;
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ //Staging Dir exists
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
@@ -93,6 +99,10 @@ import org.junit.Test;
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ //Staging Dir exists
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
@@ -118,6 +128,10 @@ import org.junit.Test;
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ //Staging Dir exists
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
@@ -198,7 +212,8 @@ import org.junit.Test;
}
@Override
- protected Job createJob(Configuration conf) {
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
@@ -208,7 +223,8 @@ import org.junit.Test;
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- isNewApiCommitter(), currentUser.getUserName(), getContext());
+ isNewApiCommitter(), currentUser.getUserName(), getContext(),
+ forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java Fri Jan 4 20:38:36 2013
@@ -36,16 +36,85 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestCommitterEventHandler {
+ public static class WaitForItHandler implements EventHandler {
+
+ private Event event = null;
+
+ @Override
+ public synchronized void handle(Event event) {
+ this.event = event;
+ notifyAll();
+ }
+
+ public synchronized Event getAndClearEvent() throws InterruptedException {
+ if (event == null) {
+ //Wait for at most 10 ms
+ wait(100);
+ }
+ Event e = event;
+ event = null;
+ return e;
+ }
+
+ }
+
+ static String stagingDir = "target/test-staging/";
+
+ @BeforeClass
+ public static void setup() {
+ File dir = new File(stagingDir);
+ stagingDir = dir.getAbsolutePath();
+ }
+ @Before
+ public void cleanup() throws IOException {
+ File dir = new File(stagingDir);
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdirs();
+ }
+
@Test
public void testCommitWindow() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -55,6 +124,10 @@ public class TestCommitterEventHandler {
SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class);
+ ApplicationAttemptId attemptid =
+ ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+ when(appContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+ when(appContext.getApplicationAttemptId()).thenReturn(attemptid);
when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock);
@@ -91,6 +164,9 @@ public class TestCommitterEventHandler {
1, jeh.numCommitCompletedEvents);
verify(committer, times(1)).commitJob(any(JobContext.class));
+ //Clean up so we can try to commit again (Don't do this at home)
+ cleanup();
+
// try to commit again and verify it goes through since the heartbeat
// is still fresh
ceh.handle(new CommitterJobCommitEvent(null, null));
@@ -147,4 +223,103 @@ public class TestCommitterEventHandler {
}
}
}
+
+ @Test
+ public void testBasic() throws Exception {
+ AppContext mockContext = mock(AppContext.class);
+ OutputCommitter mockCommitter = mock(OutputCommitter.class);
+ Clock mockClock = mock(Clock.class);
+
+ CommitterEventHandler handler = new CommitterEventHandler(mockContext,
+ mockCommitter, new TestingRMHeartbeatHandler());
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ JobContext mockJobContext = mock(JobContext.class);
+ ApplicationAttemptId attemptid =
+ ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(attemptid.getApplicationId()));
+
+ WaitForItHandler waitForItHandler = new WaitForItHandler();
+
+ when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+ when(mockContext.getApplicationAttemptId()).thenReturn(attemptid);
+ when(mockContext.getEventHandler()).thenReturn(waitForItHandler);
+ when(mockContext.getClock()).thenReturn(mockClock);
+
+ handler.init(conf);
+ handler.start();
+ try {
+ handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext));
+
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+ Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user,
+ jobId);
+ Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user,
+ jobId);
+
+ Event e = waitForItHandler.getAndClearEvent();
+ assertNotNull(e);
+ assertTrue(e instanceof JobCommitCompletedEvent);
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(startCommitFile.toString(), fs.exists(startCommitFile));
+ assertTrue(endCommitSuccessFile.toString(), fs.exists(endCommitSuccessFile));
+ assertFalse(endCommitFailureFile.toString(), fs.exists(endCommitFailureFile));
+ verify(mockCommitter).commitJob(any(JobContext.class));
+ } finally {
+ handler.stop();
+ }
+ }
+
+ @Test
+ public void testFailure() throws Exception {
+ AppContext mockContext = mock(AppContext.class);
+ OutputCommitter mockCommitter = mock(OutputCommitter.class);
+ Clock mockClock = mock(Clock.class);
+
+ CommitterEventHandler handler = new CommitterEventHandler(mockContext,
+ mockCommitter, new TestingRMHeartbeatHandler());
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ JobContext mockJobContext = mock(JobContext.class);
+ ApplicationAttemptId attemptid =
+ ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(attemptid.getApplicationId()));
+
+ WaitForItHandler waitForItHandler = new WaitForItHandler();
+
+ when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+ when(mockContext.getApplicationAttemptId()).thenReturn(attemptid);
+ when(mockContext.getEventHandler()).thenReturn(waitForItHandler);
+ when(mockContext.getClock()).thenReturn(mockClock);
+
+ doThrow(new YarnException("Intentional Failure")).when(mockCommitter)
+ .commitJob(any(JobContext.class));
+
+ handler.init(conf);
+ handler.start();
+ try {
+ handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext));
+
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+ Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user,
+ jobId);
+ Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user,
+ jobId);
+
+ Event e = waitForItHandler.getAndClearEvent();
+ assertNotNull(e);
+ assertTrue(e instanceof JobCommitFailedEvent);
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs.exists(startCommitFile));
+ assertFalse(fs.exists(endCommitSuccessFile));
+ assertTrue(fs.exists(endCommitFailureFile));
+ verify(mockCommitter).commitJob(any(JobContext.class));
+ } finally {
+ handler.stop();
+ }
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Jan 4 20:38:36 2013
@@ -23,11 +23,13 @@ import static org.mockito.Mockito.doThro
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
@@ -67,8 +69,11 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
@@ -78,10 +83,28 @@ import org.junit.Test;
@SuppressWarnings({"rawtypes"})
public class TestJobImpl {
+ static String stagingDir = "target/test-staging/";
+
+ @BeforeClass
+ public static void setup() {
+ File dir = new File(stagingDir);
+ stagingDir = dir.getAbsolutePath();
+ }
+
+ @Before
+ public void cleanup() throws IOException {
+ File dir = new File(stagingDir);
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdirs();
+ }
+
@Test
public void testJobNoTasks() {
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -103,6 +126,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -127,6 +151,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testCheckJobCompleteSuccess() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -151,6 +176,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testKilledDuringSetup() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -187,6 +213,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testKilledDuringCommit() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -211,6 +238,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testKilledDuringFailAbort() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -252,6 +280,7 @@ public class TestJobImpl {
@Test(timeout=20000)
public void testKilledDuringKillAbort() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -316,7 +345,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -327,7 +356,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -338,7 +367,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -349,7 +378,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -360,7 +389,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
- null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@@ -378,7 +407,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, true, null, 0, null, null);
+ mrAppMetrics, true, null, 0, null, null, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
@@ -389,7 +418,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, true, null, 0, null, null);
+ mrAppMetrics, true, null, 0, null, null, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
@@ -444,7 +473,7 @@ public class TestJobImpl {
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, null,
- mrAppMetrics, true, null, 0, null, null);
+ mrAppMetrics, true, null, 0, null, null, null, null);
InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent);
@@ -518,6 +547,10 @@ public class TestJobImpl {
callback.run();
}
};
+ ApplicationAttemptId id =
+ ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+ when(appContext.getApplicationID()).thenReturn(id.getApplicationId());
+ when(appContext.getApplicationAttemptId()).thenReturn(id);
CommitterEventHandler handler =
new CommitterEventHandler(appContext, committer, heartbeatHandler);
dispatcher.register(CommitterEventType.class, handler);
@@ -601,7 +634,8 @@ public class TestJobImpl {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), null, MRAppMetrics.create(),
- newApiCommitter, user, System.currentTimeMillis(), null, null);
+ newApiCommitter, user, System.currentTimeMillis(), null, null, null,
+ null);
initTransition = getInitTransition(numSplits);
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Fri Jan 4 20:38:36 2013
@@ -182,14 +182,16 @@ public class JobHistoryUtils {
/**
* Gets the configured directory prefix for In Progress history files.
- * @param conf
+ * @param conf the configuration for hte job
+ * @param jobId the id of the job the history file is for.
* @return A string representation of the prefix.
*/
public static String
- getConfiguredHistoryStagingDirPrefix(Configuration conf)
+ getConfiguredHistoryStagingDirPrefix(Configuration conf, String jobId)
throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
- Path path = MRApps.getStagingAreaDir(conf, user);
+ Path stagingPath = MRApps.getStagingAreaDir(conf, user);
+ Path path = new Path(stagingPath, jobId);
String logDir = path.toString();
return logDir;
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Jan 4 20:38:36 2013
@@ -255,7 +255,26 @@ public class MRApps extends Apps {
return jobFile.toString();
}
-
+ public static Path getEndJobCommitSuccessFile(Configuration conf, String user,
+ JobId jobId) {
+ Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+ jobId.toString() + Path.SEPARATOR + "COMMIT_SUCCESS");
+ return endCommitFile;
+ }
+
+ public static Path getEndJobCommitFailureFile(Configuration conf, String user,
+ JobId jobId) {
+ Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+ jobId.toString() + Path.SEPARATOR + "COMMIT_FAIL");
+ return endCommitFile;
+ }
+
+ public static Path getStartJobCommitFile(Configuration conf, String user,
+ JobId jobId) {
+ Path startCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+ jobId.toString() + Path.SEPARATOR + "COMMIT_STARTED");
+ return startCommitFile;
+ }
private static long[] parseTimeStamps(String[] strs) {
if (null == strs) {