You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2013/01/04 21:38:37 UTC

svn commit: r1429115 [1/2] - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...

Author: bobby
Date: Fri Jan  4 20:38:36 2013
New Revision: 1429115

URL: http://svn.apache.org/viewvc?rev=1429115&view=rev
Log:
svn merge -c 1429114 FIXES: MAPREDUCE-4819. AM can rerun job after reporting final job status to the client (bobby and Bikas Saha via bobby)

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
      - copied unchanged from r1429114, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java
      - copied unchanged from r1429114, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Jan  4 20:38:36 2013
@@ -504,6 +504,9 @@ Release 0.23.6 - UNRELEASED
     MAPREDUCE-4894. Renewal / cancellation of JobHistory tokens (Siddharth
     Seth via tgraves)
 
+    MAPREDUCE-4819. AM can rerun job after reporting final job status to the
+    client (bobby and Bikas Saha via bobby)
+
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Fri Jan  4 20:38:36 2013
@@ -116,12 +116,15 @@ public class JobHistoryEventHandler exte
    */
   @Override
   public void init(Configuration conf) {
-
+    String jobId =
+      TypeConverter.fromYarn(context.getApplicationID()).toString();
+    
     String stagingDirStr = null;
     String doneDirStr = null;
     String userDoneDirStr = null;
     try {
-      stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+      stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf,
+          jobId);
       doneDirStr =
           JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
       userDoneDirStr =
@@ -881,7 +884,7 @@ public class JobHistoryEventHandler exte
   private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
     // check if path exists, in case of retries it may not exist
     if (stagingDirFS.exists(fromPath)) {
-      LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString());
+      LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
       // TODO temporarily removing the existing dst
       if (doneDirFS.exists(toPath)) {
         doneDirFS.delete(toPath, true);
@@ -892,11 +895,9 @@ public class JobHistoryEventHandler exte
       if (copied)
         LOG.info("Copied to done location: " + toPath);
       else 
-          LOG.info("copy failed");
+        LOG.info("copy failed");
       doneDirFS.setPermission(toPath, new FsPermission(
           JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
-      
-      stagingDirFS.delete(fromPath, false);
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Jan  4 20:38:36 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.EventReader;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -66,6 +67,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -91,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
@@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
@@ -179,9 +183,13 @@ public class MRAppMaster extends Composi
 
   private Job job;
   private Credentials fsTokens = new Credentials(); // Filled during init
-  private UserGroupInformation currentUser; // Will be setup during init
+  protected UserGroupInformation currentUser; // Will be setup during init
 
   private volatile boolean isLastAMRetry = false;
+  //Something happened and we should shut down right after we start up.
+  boolean errorHappenedShutDown = false;
+  private String shutDownMessage = null;
+  JobStateInternal forcedState = null;
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
@@ -242,94 +250,175 @@ public class MRAppMaster extends Composi
       newApiCommitter = true;
       LOG.info("Using mapred newApiCommitter.");
     }
-
-    committer = createOutputCommitter(conf);
-    boolean recoveryEnabled = conf.getBoolean(
-        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-    boolean recoverySupportedByCommitter = committer.isRecoverySupported();
-    if (recoveryEnabled && recoverySupportedByCommitter
-        && appAttemptID.getAttemptId() > 1) {
-      LOG.info("Recovery is enabled. "
-          + "Will try to recover from previous life on best effort basis.");
-      recoveryServ = createRecoveryService(context);
-      addIfService(recoveryServ);
-      dispatcher = recoveryServ.getDispatcher();
-      clock = recoveryServ.getClock();
-      inRecovery = true;
-    } else {
-      LOG.info("Not starting RecoveryService: recoveryEnabled: "
-          + recoveryEnabled + " recoverySupportedByCommitter: "
-          + recoverySupportedByCommitter + " ApplicationAttemptID: "
-          + appAttemptID.getAttemptId());
+    
+    boolean copyHistory = false;
+    try {
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+      FileSystem fs = getFileSystem(conf);
+      boolean stagingExists = fs.exists(stagingDir);
+      Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+      boolean commitStarted = fs.exists(startCommitFile);
+      Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
+      boolean commitSuccess = fs.exists(endCommitSuccessFile);
+      Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
+      boolean commitFailure = fs.exists(endCommitFailureFile);
+      if(!stagingExists) {
+        isLastAMRetry = true;
+        errorHappenedShutDown = true;
+        forcedState = JobStateInternal.ERROR;
+        shutDownMessage = "Staging dir does not exist " + stagingDir;
+        LOG.fatal(shutDownMessage);
+      } else if (commitStarted) {
+        //A commit was started so this is the last time, we just need to know
+        // what result we will use to notify, and how we will unregister
+        errorHappenedShutDown = true;
+        isLastAMRetry = true;
+        copyHistory = true;
+        if (commitSuccess) {
+          shutDownMessage = "We crashed after successfully committing. Recovering.";
+          forcedState = JobStateInternal.SUCCEEDED;
+        } else if (commitFailure) {
+          shutDownMessage = "We crashed after a commit failure.";
+          forcedState = JobStateInternal.FAILED;
+        } else {
+          //The commit is still pending, commit error
+          shutDownMessage = "We crashed durring a commit";
+          forcedState = JobStateInternal.ERROR;
+        }
+      }
+    } catch (IOException e) {
+      throw new YarnException("Error while initializing", e);
+    }
+    
+    if (errorHappenedShutDown) {
       dispatcher = createDispatcher();
       addIfService(dispatcher);
-    }
+      
+      EventHandler<JobHistoryEvent> historyService = null;
+      if (copyHistory) {
+        historyService = 
+          createJobHistoryHandler(context);
+        dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+            historyService);
+      }
+      NoopEventHandler eater = new NoopEventHandler();
+      //We do not have a JobEventDispatcher in this path
+      dispatcher.register(JobEventType.class, eater);
+      
+      // service to allocate containers from RM (if non-uber) or to fake it (uber)
+      containerAllocator = createContainerAllocator(null, context);
+      addIfService(containerAllocator);
+      dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+      if (copyHistory) {
+        // Add the staging directory cleaner before the history server but after
+        // the container allocator so the staging directory is cleaned after
+        // the history has been flushed but before unregistering with the RM.
+        addService(createStagingDirCleaningService());
+
+        // Add the JobHistoryEventHandler last so that it is properly stopped first.
+        // This will guarantee that all history-events are flushed before AM goes
+        // ahead with shutdown.
+        // Note: Even though JobHistoryEventHandler is started last, if any
+        // component creates a JobHistoryEvent in the meanwhile, it will be just be
+        // queued inside the JobHistoryEventHandler 
+        addIfService(historyService);
+        
 
-    //service to handle requests from JobClient
-    clientService = createClientService(context);
-    addIfService(clientService);
-
-    containerAllocator = createContainerAllocator(clientService, context);
-
-    //service to handle requests to TaskUmbilicalProtocol
-    taskAttemptListener = createTaskAttemptListener(context);
-    addIfService(taskAttemptListener);
-
-    //service to handle the output committer
-    committerEventHandler = createCommitterEventHandler(context, committer);
-    addIfService(committerEventHandler);
+        JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
+            dispatcher.getEventHandler());
+        addIfService(cpHist);
+      }
+    } else {
+      committer = createOutputCommitter(conf);
+      boolean recoveryEnabled = conf.getBoolean(
+          MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+      boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+      if (recoveryEnabled && recoverySupportedByCommitter
+          && appAttemptID.getAttemptId() > 1) {
+        LOG.info("Recovery is enabled. "
+            + "Will try to recover from previous life on best effort basis.");
+        recoveryServ = createRecoveryService(context);
+        addIfService(recoveryServ);
+        dispatcher = recoveryServ.getDispatcher();
+        clock = recoveryServ.getClock();
+        inRecovery = true;
+      } else {
+        LOG.info("Not starting RecoveryService: recoveryEnabled: "
+            + recoveryEnabled + " recoverySupportedByCommitter: "
+            + recoverySupportedByCommitter + " ApplicationAttemptID: "
+            + appAttemptID.getAttemptId());
+        dispatcher = createDispatcher();
+        addIfService(dispatcher);
+      }
+
+      //service to handle requests from JobClient
+      clientService = createClientService(context);
+      addIfService(clientService);
+      
+      containerAllocator = createContainerAllocator(clientService, context);
+      
+      //service to handle the output committer
+      committerEventHandler = createCommitterEventHandler(context, committer);
+      addIfService(committerEventHandler);
+
+      //service to handle requests to TaskUmbilicalProtocol
+      taskAttemptListener = createTaskAttemptListener(context);
+      addIfService(taskAttemptListener);
 
-    //service to log job history events
-    EventHandler<JobHistoryEvent> historyService = 
+      //service to log job history events
+      EventHandler<JobHistoryEvent> historyService = 
         createJobHistoryHandler(context);
-    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
-        historyService);
+      dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+          historyService);
 
-    this.jobEventDispatcher = new JobEventDispatcher();
+      this.jobEventDispatcher = new JobEventDispatcher();
 
-    //register the event dispatchers
-    dispatcher.register(JobEventType.class, jobEventDispatcher);
-    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
-    dispatcher.register(TaskAttemptEventType.class, 
-        new TaskAttemptEventDispatcher());
-    dispatcher.register(CommitterEventType.class, committerEventHandler);
-   
-    if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
-        || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
-      //optional service to speculate on task attempts' progress
-      speculator = createSpeculator(conf, context);
-      addIfService(speculator);
-    }
-
-    speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
-    dispatcher.register(Speculator.EventType.class,
-        speculatorEventDispatcher);
-
-    // service to allocate containers from RM (if non-uber) or to fake it (uber)
-    addIfService(containerAllocator);
-    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
-
-    // corresponding service to launch allocated containers via NodeManager
-    containerLauncher = createContainerLauncher(context);
-    addIfService(containerLauncher);
-    dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
-
-    // Add the staging directory cleaner before the history server but after
-    // the container allocator so the staging directory is cleaned after
-    // the history has been flushed but before unregistering with the RM.
-    addService(createStagingDirCleaningService());
-
-    // Add the JobHistoryEventHandler last so that it is properly stopped first.
-    // This will guarantee that all history-events are flushed before AM goes
-    // ahead with shutdown.
-    // Note: Even though JobHistoryEventHandler is started last, if any
-    // component creates a JobHistoryEvent in the meanwhile, it will be just be
-    // queued inside the JobHistoryEventHandler 
-    addIfService(historyService);
+      //register the event dispatchers
+      dispatcher.register(JobEventType.class, jobEventDispatcher);
+      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+      dispatcher.register(TaskAttemptEventType.class, 
+          new TaskAttemptEventDispatcher());
+      dispatcher.register(CommitterEventType.class, committerEventHandler);
+
+      if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+          || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+        //optional service to speculate on task attempts' progress
+        speculator = createSpeculator(conf, context);
+        addIfService(speculator);
+      }
 
+      speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
+      dispatcher.register(Speculator.EventType.class,
+          speculatorEventDispatcher);
+
+      // service to allocate containers from RM (if non-uber) or to fake it (uber)
+      addIfService(containerAllocator);
+      dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+      // corresponding service to launch allocated containers via NodeManager
+      containerLauncher = createContainerLauncher(context);
+      addIfService(containerLauncher);
+      dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+
+      // Add the staging directory cleaner before the history server but after
+      // the container allocator so the staging directory is cleaned after
+      // the history has been flushed but before unregistering with the RM.
+      addService(createStagingDirCleaningService());
+
+      // Add the JobHistoryEventHandler last so that it is properly stopped first.
+      // This will guarantee that all history-events are flushed before AM goes
+      // ahead with shutdown.
+      // Note: Even though JobHistoryEventHandler is started last, if any
+      // component creates a JobHistoryEvent in the meanwhile, it will be just be
+      // queued inside the JobHistoryEventHandler 
+      addIfService(historyService);
+    }
+    
     super.init(conf);
   } // end of init()
-
+  
   protected Dispatcher createDispatcher() {
     return new AsyncDispatcher();
   }
@@ -489,15 +578,20 @@ public class MRAppMaster extends Composi
         appContext.getClock(), getCommitter());
   }
 
-  /** Create and initialize (but don't start) a single job. */
-  protected Job createJob(Configuration conf) {
+  /** Create and initialize (but don't start) a single job. 
+   * @param forcedState a state to force the job into or null for normal operation. 
+   * @param diagnostic a diagnostic message to include with the job.
+   */
+  protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+      String diagnostic) {
 
     // create single job
     Job newJob =
         new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
             completedTasksFromPreviousRun, metrics, newApiCommitter,
-            currentUser.getUserName(), appSubmitTime, amInfos, context);
+            currentUser.getUserName(), appSubmitTime, amInfos, context, 
+            forcedState, diagnostic);
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
     dispatcher.register(JobFinishEvent.Type.class,
@@ -874,7 +968,7 @@ public class MRAppMaster extends Composi
     amInfos.add(amInfo);
 
     // /////////////////// Create the job itself.
-    job = createJob(getConfig());
+    job = createJob(getConfig(), forcedState, shutDownMessage);
 
     // End of creating the job.
 
@@ -891,31 +985,33 @@ public class MRAppMaster extends Composi
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");
 
-    // create a job event for job intialization
-    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
-    // Send init to the job (this does NOT trigger job execution)
-    // This is a synchronous call, not an event through dispatcher. We want
-    // job-init to be done completely here.
-    jobEventDispatcher.handle(initJobEvent);
-
-
-    // JobImpl's InitTransition is done (call above is synchronous), so the
-    // "uber-decision" (MR-1220) has been made.  Query job and switch to
-    // ubermode if appropriate (by registering different container-allocator
-    // and container-launcher services/event-handlers).
-
-    if (job.isUber()) {
-      speculatorEventDispatcher.disableSpeculation();
-      LOG.info("MRAppMaster uberizing job " + job.getID()
-               + " in local container (\"uber-AM\") on node "
-               + nmHost + ":" + nmPort + ".");
-    } else {
-      // send init to speculator only for non-uber jobs. 
-      // This won't yet start as dispatcher isn't started yet.
-      dispatcher.getEventHandler().handle(
-          new SpeculatorEvent(job.getID(), clock.getTime()));
-      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
-               + "job " + job.getID() + ".");
+    if (!errorHappenedShutDown) {
+      // create a job event for job intialization
+      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+      // Send init to the job (this does NOT trigger job execution)
+      // This is a synchronous call, not an event through dispatcher. We want
+      // job-init to be done completely here.
+      jobEventDispatcher.handle(initJobEvent);
+
+
+      // JobImpl's InitTransition is done (call above is synchronous), so the
+      // "uber-decision" (MR-1220) has been made.  Query job and switch to
+      // ubermode if appropriate (by registering different container-allocator
+      // and container-launcher services/event-handlers).
+
+      if (job.isUber()) {
+        speculatorEventDispatcher.disableSpeculation();
+        LOG.info("MRAppMaster uberizing job " + job.getID()
+            + " in local container (\"uber-AM\") on node "
+            + nmHost + ":" + nmPort + ".");
+      } else {
+        // send init to speculator only for non-uber jobs. 
+        // This won't yet start as dispatcher isn't started yet.
+        dispatcher.getEventHandler().handle(
+            new SpeculatorEvent(job.getID(), clock.getTime()));
+        LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+            + "job " + job.getID() + ".");
+      }
     }
 
     //start all the components
@@ -1062,6 +1158,17 @@ public class MRAppMaster extends Composi
 
   }
 
+  /**
+   * Eats events that are not needed in some error cases.
+   */
+  private static class NoopEventHandler implements EventHandler<Event> {
+
+    @Override
+    public void handle(Event event) {
+      //Empty
+    }
+  }
+  
   private static void validateInputParam(String value, String param)
       throws IOException {
     if (value == null) {
@@ -1158,6 +1265,9 @@ public class MRAppMaster extends Composi
       public Object run() throws Exception {
         appMaster.init(conf);
         appMaster.start();
+        if(appMaster.errorHappenedShutDown) {
+          throw new IOException("Was asked to shut down.");
+        }
         return null;
       }
     });

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Fri Jan  4 20:38:36 2013
@@ -29,8 +29,13 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
@@ -40,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -64,6 +71,11 @@ public class CommitterEventHandler exten
   private Thread jobCommitThread = null;
   private int commitThreadCancelTimeoutMs;
   private long commitWindowMs;
+  private FileSystem fs;
+  private Path startCommitFile;
+  private Path endCommitSuccessFile;
+  private Path endCommitFailureFile;
+  
 
   public CommitterEventHandler(AppContext context, OutputCommitter committer,
       RMHeartbeatHandler rmHeartbeatHandler) {
@@ -82,10 +94,21 @@ public class CommitterEventHandler exten
         MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
     commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
         MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
+    try {
+      fs = FileSystem.get(conf);
+      JobID id = TypeConverter.fromYarn(context.getApplicationID());
+      JobId jobId = TypeConverter.toYarn(id);
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+      endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
+      endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
   }
 
   @Override
-  public void start() {
+  public void start() {    
     ThreadFactory tf = new ThreadFactoryBuilder()
       .setNameFormat("CommitterEvent Processor #%d")
       .build();
@@ -199,7 +222,7 @@ public class CommitterEventHandler exten
             + event.toString());
       }
     }
-
+    
     @SuppressWarnings("unchecked")
     protected void handleJobSetup(CommitterJobSetupEvent event) {
       try {
@@ -213,19 +236,30 @@ public class CommitterEventHandler exten
       }
     }
 
+    private void touchz(Path p) throws IOException {
+      fs.create(p, false).close();
+    }
+    
     @SuppressWarnings("unchecked")
     protected void handleJobCommit(CommitterJobCommitEvent event) {
       try {
+        touchz(startCommitFile);
         jobCommitStarted();
         waitForValidCommitWindow();
         committer.commitJob(event.getJobContext());
+        touchz(endCommitSuccessFile);
         context.getEventHandler().handle(
             new JobCommitCompletedEvent(event.getJobID()));
       } catch (Exception e) {
-          LOG.error("Could not commit job", e);
-          context.getEventHandler().handle(
-              new JobCommitFailedEvent(event.getJobID(),
-                  StringUtils.stringifyException(e)));
+        try {
+          touchz(endCommitFailureFile);
+        } catch (Exception e2) {
+          LOG.error("could not create failure file.", e2);
+        }
+        LOG.error("Could not commit job", e);
+        context.getEventHandler().handle(
+            new JobCommitFailedEvent(event.getJobID(),
+                StringUtils.stringifyException(e)));
       } finally {
         jobCommitEnded();
       }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Jan  4 20:38:36 2013
@@ -540,6 +540,8 @@ public class JobImpl implements org.apac
   private Credentials fsTokens;
   private Token<JobTokenIdentifier> jobToken;
   private JobTokenSecretManager jobTokenSecretManager;
+  
+  private JobStateInternal forcedState = null;
 
   public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
       Configuration conf, EventHandler eventHandler,
@@ -548,7 +550,8 @@ public class JobImpl implements org.apac
       Credentials fsTokenCredentials, Clock clock,
       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
       boolean newApiCommitter, String userName,
-      long appSubmitTime, List<AMInfo> amInfos, AppContext appContext) {
+      long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
+      JobStateInternal forcedState, String forcedDiagnostic) {
     this.applicationAttemptId = applicationAttemptId;
     this.jobId = jobId;
     this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
@@ -579,6 +582,10 @@ public class JobImpl implements org.apac
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
+    this.forcedState  = forcedState;
+    if(forcedDiagnostic != null) {
+      this.diagnostics.add(forcedDiagnostic);
+    }
   }
 
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -818,7 +825,7 @@ public class JobImpl implements org.apac
   public JobState getState() {
     readLock.lock();
     try {
-      return getExternalState(getStateMachine().getCurrentState());
+      return getExternalState(getInternalState());
     } finally {
       readLock.unlock();
     }
@@ -868,6 +875,9 @@ public class JobImpl implements org.apac
   public JobStateInternal getInternalState() {
     readLock.lock();
     try {
+      if(forcedState != null) {
+        return forcedState;
+      }
      return getStateMachine().getCurrentState();
     } finally {
       readLock.unlock();

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Fri Jan  4 20:38:36 2013
@@ -205,18 +205,18 @@ public class RecoveryService extends Com
       throws IOException {
     FSDataInputStream in = null;
     Path historyFile = null;
-    String jobName =
+    String jobId =
         TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
           .toString();
     String jobhistoryDir =
-        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
     Path histDirPath =
         FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
     // read the previous history file
     historyFile =
         fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
-          jobName, (applicationAttemptId.getAttemptId() - 1)));
+          jobId, (applicationAttemptId.getAttemptId() - 1)));
     LOG.info("History file is at " + historyFile);
     in = fc.open(historyFile);
     return in;

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Jan  4 20:38:36 2013
@@ -141,14 +141,19 @@ public abstract class RMCommunicator ext
 
   protected void register() {
     //Register
-    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    InetSocketAddress serviceAddr = null;
+    if (clientService != null ) {
+      serviceAddr = clientService.getBindAddress();
+    }
     try {
       RegisterApplicationMasterRequest request =
         recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
       request.setApplicationAttemptId(applicationAttemptId);
-      request.setHost(serviceAddr.getHostName());
-      request.setRpcPort(serviceAddr.getPort());
-      request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      if (serviceAddr != null) {
+        request.setHost(serviceAddr.getHostName());
+        request.setRpcPort(serviceAddr.getPort());
+        request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      }
       RegisterApplicationMasterResponse response =
         scheduler.registerApplicationMaster(request);
       minContainerCapability = response.getMinimumResourceCapability();

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Fri Jan  4 20:38:36 2013
@@ -18,14 +18,9 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static junit.framework.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -51,7 +46,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
 
 public class TestJobHistoryEventHandler {
 
@@ -260,13 +254,15 @@ public class TestJobHistoryEventHandler 
     }
   }
 
-  private AppContext mockAppContext(JobId jobId) {
+  private AppContext mockAppContext(ApplicationId appId) {
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
     AppContext mockContext = mock(AppContext.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getTotalMaps()).thenReturn(10);
     when(mockJob.getTotalReduces()).thenReturn(10);
     when(mockJob.getName()).thenReturn("mockjob");
     when(mockContext.getJob(jobId)).thenReturn(mockJob);
+    when(mockContext.getApplicationID()).thenReturn(appId);
     return mockContext;
   }
   
@@ -279,7 +275,7 @@ public class TestJobHistoryEventHandler 
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
     TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-    AppContext mockAppContext = mockAppContext(jobId);
+    AppContext mockAppContext = mockAppContext(appId);
   }
 
   private JobHistoryEvent getEventToEnqueue(JobId jobId) {

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Jan  4 20:38:36 2013
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -208,6 +209,16 @@ public class MRApp extends MRAppMaster {
 
   @Override
   public void init(Configuration conf) {
+    try {
+      //Create the staging directory if it does not exist
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+      FileSystem fs = getFileSystem(conf);
+      fs.mkdirs(stagingDir);
+    } catch (Exception e) {
+      throw new YarnException("Error creating staging dir", e);
+    }
+    
     super.init(conf);
     if (this.clusterInfo != null) {
       getContext().getClusterInfo().setMinContainerCapability(
@@ -388,7 +399,8 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected Job createJob(Configuration conf) {
+  protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+      String diagnostic) {
     UserGroupInformation currentUser = null;
     try {
       currentUser = UserGroupInformation.getCurrentUser();
@@ -398,7 +410,8 @@ public class MRApp extends MRAppMaster {
     Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
     		getDispatcher().getEventHandler(),
             getTaskAttemptListener(), getContext().getClock(),
-            isNewApiCommitter(), currentUser.getUserName(), getContext());
+            isNewApiCommitter(), currentUser.getUserName(), getContext(),
+            forcedState, diagnostic);
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
     getDispatcher().register(JobFinishEvent.Type.class,
@@ -631,13 +644,14 @@ public class MRApp extends MRAppMaster {
     public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock,
-        boolean newApiCommitter, String user, AppContext appContext) {
+        boolean newApiCommitter, String user, AppContext appContext, 
+        JobStateInternal forcedState, String diagnostic) {
       super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
           conf, eventHandler, taskAttemptListener,
           new JobTokenSecretManager(), new Credentials(), clock,
           getCompletedTaskFromPreviousRun(), metrics,
           newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
-          appContext);
+          appContext, forcedState, diagnostic);
 
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Fri Jan  4 20:38:36 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
@@ -370,8 +371,9 @@ public class TestMRApp {
     }
 
     @Override
-    protected Job createJob(Configuration conf) {
-      spiedJob = spy((JobImpl) super.createJob(conf));
+    protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+        String diagnostic) {
+      spiedJob = spy((JobImpl) super.createJob(conf, forcedState, diagnostic));
       ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
       return spiedJob;
     }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Fri Jan  4 20:38:36 2013
@@ -17,28 +17,63 @@
  */
 package org.apache.hadoop.mapreduce.v2.app;
 
-import java.io.IOException;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
-import junit.framework.Assert;
+import java.io.File;
+import java.io.IOException;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestMRAppMaster {
+  private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
+  static String stagingDir = "staging/";
+  
+  @BeforeClass
+  public static void setup() {
+    //Do not error out if metrics are inited multiple times
+    DefaultMetricsSystem.setMiniClusterMode(true);
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
+  
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testMRAppMasterForDifferentUser() throws IOException,
       InterruptedException {
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
     String containerIdStr = "container_1317529182569_0004_000001_1";
-    String stagingDir = "/tmp/staging";
+    
     String userName = "TestAppMasterUser";
     ApplicationAttemptId applicationAttemptId = ConverterUtils
         .toApplicationAttemptId(applicationAttemptIdStr);
@@ -49,34 +84,208 @@ public class TestMRAppMaster {
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
-    Assert.assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+    assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
         + ".staging", appMaster.stagingDirPath.toString());
   }
+  
+  @Test
+  public void testMRAppMasterMidLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    //Create the file, but no end file so we should unregister with an error.
+    fs.create(start).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterSuccessLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    Path end = MRApps.getEndJobCommitSuccessFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    fs.create(start).close();
+    fs.create(end).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterFailLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    Path end = MRApps.getEndJobCommitFailureFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    fs.create(start).close();
+    fs.create(end).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterMissingStaging() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+
+    //Delete the staging directory
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    //Copying the history file is disabled, but it is not really visible from 
+    //here
+    assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+    appMaster.stop();
+  }
 }
 
 class MRAppMasterTest extends MRAppMaster {
 
   Path stagingDirPath;
   private Configuration conf;
+  private boolean overrideInitAndStart;
+  ContainerAllocator mockContainerAllocator;
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
       long submitTime) {
+    this(applicationAttemptId, containerId, host, port, httpPort, submitTime, 
+        true);
+  }
+  public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String host, int port, int httpPort,
+      long submitTime, boolean overrideInitAndStart) {
     super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
+    this.overrideInitAndStart = overrideInitAndStart;
+    mockContainerAllocator = mock(ContainerAllocator.class);
   }
 
   @Override
   public void init(Configuration conf) {
-    this.conf = conf;
+    if (overrideInitAndStart) {
+      this.conf = conf; 
+    } else {
+      super.init(conf);
+    }
+  }
+  
+  @Override 
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
+    try {
+      this.currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+  
+  @Override
+  protected ContainerAllocator createContainerAllocator(
+      final ClientService clientService, final AppContext context) {
+    return mockContainerAllocator;
   }
 
   @Override
   public void start() {
-    try {
-      String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      stagingDirPath = MRApps.getStagingAreaDir(conf, user);
-    } catch (Exception e) {
-      Assert.fail(e.getMessage());
+    if (overrideInitAndStart) {
+      try {
+        String user = UserGroupInformation.getCurrentUser().getShortUserName();
+        stagingDirPath = MRApps.getStagingAreaDir(conf, user);
+      } catch (Exception e) {
+        fail(e.getMessage());
+      }
+    } else {
+      super.start();
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Jan  4 20:38:36 2013
@@ -38,11 +38,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -72,6 +74,10 @@ import org.junit.Test;
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(0);
@@ -93,6 +99,10 @@ import org.junit.Test;
      conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(0);
@@ -118,6 +128,10 @@ import org.junit.Test;
      conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(1);
@@ -198,7 +212,8 @@ import org.junit.Test;
     }
 
     @Override
-    protected Job createJob(Configuration conf) {
+    protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+        String diagnostic) {
       UserGroupInformation currentUser = null;
       try {
         currentUser = UserGroupInformation.getCurrentUser();
@@ -208,7 +223,8 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          isNewApiCommitter(), currentUser.getUserName(), getContext());
+          isNewApiCommitter(), currentUser.getUserName(), getContext(),
+          forcedState, diagnostic);
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
       getDispatcher().register(JobFinishEvent.Type.class,

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java Fri Jan  4 20:38:36 2013
@@ -36,16 +36,85 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestCommitterEventHandler {
+  public static class WaitForItHandler implements EventHandler {
+
+    private Event event = null;
+    
+    @Override
+    public synchronized void handle(Event event) {
+      this.event = event;
+      notifyAll();
+    }
+    
+    public synchronized Event getAndClearEvent() throws InterruptedException {
+      if (event == null) {
+        //Wait for at most 10 ms
+        wait(100);
+      }
+      Event e = event;
+      event = null;
+      return e;
+    }
+    
+  }
+  
+  static String stagingDir = "target/test-staging/";
+
+  @BeforeClass
+  public static void setup() {    
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
 
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testCommitWindow() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -55,6 +124,10 @@ public class TestCommitterEventHandler {
 
     SystemClock clock = new SystemClock();
     AppContext appContext = mock(AppContext.class);
+    ApplicationAttemptId attemptid = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    when(appContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+    when(appContext.getApplicationAttemptId()).thenReturn(attemptid);
     when(appContext.getEventHandler()).thenReturn(
         dispatcher.getEventHandler());
     when(appContext.getClock()).thenReturn(clock);
@@ -91,6 +164,9 @@ public class TestCommitterEventHandler {
         1, jeh.numCommitCompletedEvents);
     verify(committer, times(1)).commitJob(any(JobContext.class));
 
+    //Clean up so we can try to commit again (Don't do this at home)
+    cleanup();
+    
     // try to commit again and verify it goes through since the heartbeat
     // is still fresh
     ceh.handle(new CommitterJobCommitEvent(null, null));
@@ -147,4 +223,103 @@ public class TestCommitterEventHandler {
       }
     }
   }
+
+  @Test
+  public void testBasic() throws Exception {
+    AppContext mockContext = mock(AppContext.class);
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    Clock mockClock = mock(Clock.class);
+    
+    CommitterEventHandler handler = new CommitterEventHandler(mockContext, 
+        mockCommitter, new TestingRMHeartbeatHandler());
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    JobContext mockJobContext = mock(JobContext.class);
+    ApplicationAttemptId attemptid = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(attemptid.getApplicationId()));
+    
+    WaitForItHandler waitForItHandler = new WaitForItHandler();
+    
+    when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+    when(mockContext.getApplicationAttemptId()).thenReturn(attemptid);
+    when(mockContext.getEventHandler()).thenReturn(waitForItHandler);
+    when(mockContext.getClock()).thenReturn(mockClock);
+    
+    handler.init(conf);
+    handler.start();
+    try {
+      handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext));
+
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+      Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, 
+          jobId);
+      Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, 
+          jobId);
+
+      Event e = waitForItHandler.getAndClearEvent();
+      assertNotNull(e);
+      assertTrue(e instanceof JobCommitCompletedEvent);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(startCommitFile.toString(), fs.exists(startCommitFile));
+      assertTrue(endCommitSuccessFile.toString(), fs.exists(endCommitSuccessFile));
+      assertFalse(endCommitFailureFile.toString(), fs.exists(endCommitFailureFile));
+      verify(mockCommitter).commitJob(any(JobContext.class));
+    } finally {
+      handler.stop();
+    }
+  }
+
+  @Test
+  public void testFailure() throws Exception {
+    AppContext mockContext = mock(AppContext.class);
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    Clock mockClock = mock(Clock.class);
+    
+    CommitterEventHandler handler = new CommitterEventHandler(mockContext, 
+        mockCommitter, new TestingRMHeartbeatHandler());
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    JobContext mockJobContext = mock(JobContext.class);
+    ApplicationAttemptId attemptid = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(attemptid.getApplicationId()));
+    
+    WaitForItHandler waitForItHandler = new WaitForItHandler();
+    
+    when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
+    when(mockContext.getApplicationAttemptId()).thenReturn(attemptid);
+    when(mockContext.getEventHandler()).thenReturn(waitForItHandler);
+    when(mockContext.getClock()).thenReturn(mockClock);
+    
+    doThrow(new YarnException("Intentional Failure")).when(mockCommitter)
+      .commitJob(any(JobContext.class));
+    
+    handler.init(conf);
+    handler.start();
+    try {
+      handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext));
+
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
+      Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, 
+          jobId);
+      Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, 
+          jobId);
+
+      Event e = waitForItHandler.getAndClearEvent();
+      assertNotNull(e);
+      assertTrue(e instanceof JobCommitFailedEvent);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(fs.exists(startCommitFile));
+      assertFalse(fs.exists(endCommitSuccessFile));
+      assertTrue(fs.exists(endCommitFailureFile));
+      verify(mockCommitter).commitJob(any(JobContext.class));
+    } finally {
+      handler.stop();
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Jan  4 20:38:36 2013
@@ -23,11 +23,13 @@ import static org.mockito.Mockito.doThro
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -67,8 +69,11 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 
@@ -78,10 +83,28 @@ import org.junit.Test;
 @SuppressWarnings({"rawtypes"})
 public class TestJobImpl {
   
+  static String stagingDir = "target/test-staging/";
+
+  @BeforeClass
+  public static void setup() {    
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
+
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testJobNoTasks() {
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -103,6 +126,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testCommitJobFailsJob() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -127,6 +151,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testCheckJobCompleteSuccess() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -151,6 +176,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringSetup() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -187,6 +213,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringCommit() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -211,6 +238,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringFailAbort() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -252,6 +280,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringKillAbort() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -316,7 +345,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -327,7 +356,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -338,7 +367,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -349,7 +378,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -360,7 +389,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -378,7 +407,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -389,7 +418,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -444,7 +473,7 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null, null, null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
@@ -518,6 +547,10 @@ public class TestJobImpl {
         callback.run();
       }
     };
+    ApplicationAttemptId id = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    when(appContext.getApplicationID()).thenReturn(id.getApplicationId());
+    when(appContext.getApplicationAttemptId()).thenReturn(id);
     CommitterEventHandler handler =
         new CommitterEventHandler(appContext, committer, heartbeatHandler);
     dispatcher.register(CommitterEventType.class, handler);
@@ -601,7 +634,8 @@ public class TestJobImpl {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
           new SystemClock(), null, MRAppMetrics.create(),
-          newApiCommitter, user, System.currentTimeMillis(), null, null);
+          newApiCommitter, user, System.currentTimeMillis(), null, null, null,
+          null);
 
       initTransition = getInitTransition(numSplits);
       localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Fri Jan  4 20:38:36 2013
@@ -182,14 +182,16 @@ public class JobHistoryUtils {
 
   /**
    * Gets the configured directory prefix for In Progress history files.
-   * @param conf
+   * @param conf the configuration for hte job
+   * @param jobId the id of the job the history file is for.
    * @return A string representation of the prefix.
    */
   public static String
-      getConfiguredHistoryStagingDirPrefix(Configuration conf)
+      getConfiguredHistoryStagingDirPrefix(Configuration conf, String jobId)
           throws IOException {
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
-    Path path = MRApps.getStagingAreaDir(conf, user);
+    Path stagingPath = MRApps.getStagingAreaDir(conf, user);
+    Path path = new Path(stagingPath, jobId);
     String logDir = path.toString();
     return logDir;
   }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1429115&r1=1429114&r2=1429115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Jan  4 20:38:36 2013
@@ -255,7 +255,26 @@ public class MRApps extends Apps {
     return jobFile.toString();
   }
   
-
+  public static Path getEndJobCommitSuccessFile(Configuration conf, String user,
+      JobId jobId) {
+    Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+        jobId.toString() + Path.SEPARATOR + "COMMIT_SUCCESS");
+    return endCommitFile;
+  }
+  
+  public static Path getEndJobCommitFailureFile(Configuration conf, String user,
+      JobId jobId) {
+    Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+        jobId.toString() + Path.SEPARATOR + "COMMIT_FAIL");
+    return endCommitFile;
+  }
+  
+  public static Path getStartJobCommitFile(Configuration conf, String user,
+      JobId jobId) {
+    Path startCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
+        jobId.toString() + Path.SEPARATOR + "COMMIT_STARTED");
+    return startCommitFile;
+  }
 
   private static long[] parseTimeStamps(String[] strs) {
     if (null == strs) {