You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/04/08 18:12:33 UTC

svn commit: r1465674 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/j...

Author: jlowe
Date: Mon Apr  8 16:12:32 2013
New Revision: 1465674

URL: http://svn.apache.org/r1465674
Log:
MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot command from the RM. Contributed by Jian He

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Apr  8 16:12:32 2013
@@ -110,6 +110,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5006. Fix failing streaming tests due to MAPREDUCE-4994.
     (Sandy Ryza via tomwhite)
 
+    MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot
+    command from the RM. (Jian He via jlowe)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Apr  8 16:12:32 2013
@@ -549,8 +549,14 @@ public class MRAppMaster extends Composi
     }
 
     try {
-      //We are finishing cleanly so this is the last retry
-      isLastAMRetry = true;
+      //if isLastAMRetry comes as true, should never set it to false
+      if ( !isLastAMRetry){
+        if (((JobImpl)job).getInternalState() != JobStateInternal.REBOOT) {
+          LOG.info("We are finishing cleanly so this is the last retry");
+          isLastAMRetry = true;
+        }
+      }
+      notifyIsLastAMRetry(isLastAMRetry);
       // Stop all services
       // This will also send the final report to the ResourceManager
       LOG.info("Calling stop for all the services");
@@ -1272,19 +1278,25 @@ public class MRAppMaster extends Composi
       // that they don't take too long in shutting down
       if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
         ((ContainerAllocatorRouter) appMaster.containerAllocator)
-        .setSignalled(true);
-        ((ContainerAllocatorRouter) appMaster.containerAllocator)
-        .setShouldUnregister(appMaster.isLastAMRetry);
-      }
-      
-      if(appMaster.jobHistoryEventHandler != null) {
-        appMaster.jobHistoryEventHandler
-          .setForcejobCompletion(appMaster.isLastAMRetry);
+          .setSignalled(true);
       }
+      appMaster.notifyIsLastAMRetry(appMaster.isLastAMRetry);
       appMaster.stop();
     }
   }
 
+  public void notifyIsLastAMRetry(boolean isLastAMRetry){
+    if(containerAllocator instanceof ContainerAllocatorRouter) {
+      LOG.info("Notify RMCommunicator isAMLastRetry: " + isLastAMRetry);
+      ((ContainerAllocatorRouter) containerAllocator)
+        .setShouldUnregister(isLastAMRetry);
+    }
+    if(jobHistoryEventHandler != null) {
+      LOG.info("Notify JHEH isAMLastRetry: " + isLastAMRetry);
+      jobHistoryEventHandler.setForcejobCompletion(isLastAMRetry);
+    }
+  }
+
   protected static void initAndStartAppMaster(final MRAppMaster appMaster,
       final YarnConfiguration conf, String jobUserName) throws IOException,
       InterruptedException {

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Mon Apr  8 16:12:32 2013
@@ -30,5 +30,6 @@ public enum JobStateInternal {
   KILL_WAIT,
   KILL_ABORT,
   KILLED,
-  ERROR
+  ERROR,
+  REBOOT
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Mon Apr  8 16:12:32 2013
@@ -54,6 +54,6 @@ public enum JobEventType {
   JOB_TASK_ATTEMPT_FETCH_FAILURE,
   
   //Producer:RMContainerAllocator
-  JOB_UPDATED_NODES
-  
+  JOB_UPDATED_NODES,
+  JOB_AM_REBOOT
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Apr  8 16:12:32 2013
@@ -215,6 +215,8 @@ public class JobImpl implements org.apac
       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition
       INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final InternalRebootTransition
+      INTERNAL_REBOOT_TRANSITION = new InternalRebootTransition();
   private static final TaskAttemptCompletedEventTransition
       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new TaskAttemptCompletedEventTransition();
@@ -246,6 +248,9 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
           // Ignore-able events
           .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_UPDATED_NODES)
@@ -265,6 +270,9 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(JobStateInternal.INITED, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
           // Ignore-able events
           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_UPDATED_NODES)
@@ -287,6 +295,9 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
           // Ignore-able events
           .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
               JobEventType.JOB_UPDATED_NODES)
@@ -327,6 +338,9 @@ public class JobImpl implements org.apac
               JobStateInternal.RUNNING,
               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
 
           // Transitions from KILL_WAIT state.
           .addTransition
@@ -352,7 +366,8 @@ public class JobImpl implements org.apac
               EnumSet.of(JobEventType.JOB_KILL,
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from COMMITTING state
           .addTransition(JobStateInternal.COMMITTING,
@@ -377,7 +392,10 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.COMMITTING,
               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-          // Ignore-able events
+          .addTransition(JobStateInternal.COMMITTING, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
+              // Ignore-able events
           .addTransition(JobStateInternal.COMMITTING,
               JobStateInternal.COMMITTING,
               EnumSet.of(JobEventType.JOB_UPDATED_NODES,
@@ -397,7 +415,8 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from FAIL_ABORT state
           .addTransition(JobStateInternal.FAIL_ABORT,
@@ -425,7 +444,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.JOB_COMMIT_COMPLETED,
-                  JobEventType.JOB_COMMIT_FAILED))
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from KILL_ABORT state
           .addTransition(JobStateInternal.KILL_ABORT,
@@ -452,7 +472,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_SETUP_COMPLETED,
                   JobEventType.JOB_SETUP_FAILED,
                   JobEventType.JOB_COMMIT_COMPLETED,
-                  JobEventType.JOB_COMMIT_FAILED))
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from FAILED state
           .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
@@ -476,7 +497,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_SETUP_FAILED,
                   JobEventType.JOB_COMMIT_COMPLETED,
                   JobEventType.JOB_COMMIT_FAILED,
-                  JobEventType.JOB_ABORT_COMPLETED))
+                  JobEventType.JOB_ABORT_COMPLETED,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from KILLED state
           .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
@@ -498,7 +520,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_SETUP_FAILED,
                   JobEventType.JOB_COMMIT_COMPLETED,
                   JobEventType.JOB_COMMIT_FAILED,
-                  JobEventType.JOB_ABORT_COMPLETED))
+                  JobEventType.JOB_ABORT_COMPLETED,
+                  JobEventType.JOB_AM_REBOOT))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
@@ -517,9 +540,33 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_COMMIT_COMPLETED,
                   JobEventType.JOB_COMMIT_FAILED,
                   JobEventType.JOB_ABORT_COMPLETED,
-                  JobEventType.INTERNAL_ERROR))
+                  JobEventType.INTERNAL_ERROR,
+                  JobEventType.JOB_AM_REBOOT))
           .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+
+          // No transitions from AM_REBOOT state. Ignore all.
+          .addTransition(
+              JobStateInternal.REBOOT,
+              JobStateInternal.REBOOT,
+              EnumSet.of(JobEventType.JOB_INIT,
+                  JobEventType.JOB_KILL,
+                  JobEventType.JOB_TASK_COMPLETED,
+                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED,
+                  JobEventType.JOB_DIAGNOSTIC_UPDATE,
+                  JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_SETUP_COMPLETED,
+                  JobEventType.JOB_SETUP_FAILED,
+                  JobEventType.JOB_COMMIT_COMPLETED,
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_ABORT_COMPLETED,
+                  JobEventType.INTERNAL_ERROR,
+                  JobEventType.JOB_AM_REBOOT))
+          .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+
           // create the topology tables
           .installTopology();
  
@@ -904,6 +951,8 @@ public class JobImpl implements org.apac
       return JobState.RUNNING;
     case FAIL_ABORT:
       return JobState.FAILED;
+    case REBOOT:
+      return JobState.ERROR;
     default:
       return JobState.valueOf(smState.name());
     }
@@ -972,6 +1021,7 @@ public class JobImpl implements org.apac
       case KILLED:
         metrics.killedJob(this);
         break;
+      case REBOOT:
       case ERROR:
       case FAILED:
         metrics.failedJob(this);
@@ -1899,8 +1949,17 @@ public class JobImpl implements org.apac
     }
   }
   
-  private static class InternalErrorTransition implements
+  private static class InternalTerminationTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
+    JobStateInternal terminationState = null;
+    String jobHistoryString = null;
+    public InternalTerminationTransition(JobStateInternal stateInternal,
+        String jobHistoryString) {
+      this.terminationState = stateInternal;
+      //mostly a hack for jbhistoryserver
+      this.jobHistoryString = jobHistoryString;
+    }
+
     @Override
     public void transition(JobImpl job, JobEvent event) {
       //TODO Is this JH event required.
@@ -1908,9 +1967,21 @@ public class JobImpl implements org.apac
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobStateInternal.ERROR.toString());
+              jobHistoryString);
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished(JobStateInternal.ERROR);
+      job.finished(terminationState);
+    }
+  }
+
+  private static class InternalErrorTransition extends InternalTerminationTransition {
+    public InternalErrorTransition(){
+      super(JobStateInternal.ERROR, JobStateInternal.ERROR.toString());
+    }
+  }
+
+  private static class InternalRebootTransition extends InternalTerminationTransition  {
+    public InternalRebootTransition(){
+      super(JobStateInternal.REBOOT, JobStateInternal.ERROR.toString());
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Mon Apr  8 16:12:32 2013
@@ -123,7 +123,7 @@ public class LocalContainerAllocator ext
       // This can happen if the RM has been restarted. If it is in that state,
       // this application must clean itself up.
       eventHandler.handle(new JobEvent(this.getJob().getID(),
-                                       JobEventType.INTERNAL_ERROR));
+                                       JobEventType.JOB_AM_REBOOT));
       throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
                                this.getContext().getApplicationID());
     }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Mon Apr  8 16:12:32 2013
@@ -574,7 +574,7 @@ public class RMContainerAllocator extend
       // This can happen if the RM has been restarted. If it is in that state,
       // this application must clean itself up.
       eventHandler.handle(new JobEvent(this.getJob().getID(),
-                                       JobEventType.INTERNAL_ERROR));
+                                       JobEventType.JOB_AM_REBOOT));
       throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
                                this.getContext().getApplicationID());
     }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Mon Apr  8 16:12:32 2013
@@ -33,7 +33,9 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -45,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -86,9 +89,68 @@ import org.junit.Test;
      attemptId.setApplicationId(appId);
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
-     MRAppMaster appMaster = new TestMRApp(attemptId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+     appMaster.init(conf);
+     appMaster.start();
+     appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+     verify(fs).delete(stagingJobPath, true);
+   }
+
+   @Test (timeout = 30000)
+   public void testNoDeletionofStagingOnReboot() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(0);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.REBOOT, 4);
+     appMaster.init(conf);
+     appMaster.start();
+     //shutdown the job, not the lastRetry
+     appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+     verify(fs, times(0)).delete(stagingJobPath, true);
+   }
+
+   @Test (timeout = 30000)
+   public void testDeletionofStagingOnReboot() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(1);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
      appMaster.init(conf);
+     appMaster.start();
+     //shutdown the job, is lastRetry
      appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
      verify(fs).delete(stagingJobPath, true);
    }
    
@@ -151,6 +213,8 @@ import org.junit.Test;
 
    private class TestMRApp extends MRAppMaster {
      ContainerAllocator allocator;
+     boolean testIsLastAMRetry = false;
+     JobStateInternal jobStateInternal;
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
          ContainerAllocator allocator, int maxAppAttempts) {
@@ -160,9 +224,11 @@ import org.junit.Test;
        this.allocator = allocator;
      }
 
-     public TestMRApp(ApplicationAttemptId applicationAttemptId) {
-       this(applicationAttemptId, null,
-           MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+     public TestMRApp(ApplicationAttemptId applicationAttemptId,
+         ContainerAllocator allocator, JobStateInternal jobStateInternal,
+             int maxAppAttempts) {
+       this(applicationAttemptId, allocator, maxAppAttempts);
+       this.jobStateInternal = jobStateInternal;
      }
 
      @Override
@@ -180,6 +246,31 @@ import org.junit.Test;
      }
 
      @Override
+     protected Job createJob(Configuration conf, JobStateInternal forcedState,
+         String diagnostic) {
+       JobImpl jobImpl = mock(JobImpl.class);
+       when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal);
+       JobID jobID = JobID.forName("job_1234567890000_0001");
+       JobId jobId = TypeConverter.toYarn(jobID);
+       when(jobImpl.getID()).thenReturn(jobId);
+       ((AppContext) getContext())
+           .getAllJobs().put(jobImpl.getID(), jobImpl);
+       return jobImpl;
+     }
+
+     @Override
+     public void start() {
+       super.start();
+       DefaultMetricsSystem.shutdown();
+     }
+
+     @Override
+     public void notifyIsLastAMRetry(boolean isLastAMRetry){
+       testIsLastAMRetry = isLastAMRetry;
+       super.notifyIsLastAMRetry(isLastAMRetry);
+     }
+
+     @Override
      public RMHeartbeatHandler getRMHeartbeatHandler() {
        return getStubbedHeartbeatHandler(getContext());
      }
@@ -197,6 +288,9 @@ import org.junit.Test;
      protected void downloadTokensAndSetupUGI(Configuration conf) {
      }
 
+     public boolean getTestIsLastAMRetry(){
+       return testIsLastAMRetry;
+     }
    }
 
   private final class MRAppTestCleanup extends MRApp {
@@ -288,7 +382,7 @@ import org.junit.Test;
     };
   }
 
-  @Test
+  @Test(timeout=20000)
   public void testStagingCleanupOrder() throws Exception {
     MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
         this.getClass().getName(), true);

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Mon Apr  8 16:12:32 2013
@@ -195,6 +195,68 @@ public class TestJobImpl {
   }
 
   @Test(timeout=20000)
+  public void testRebootedDuringSetup() throws Exception{
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void setupJob(JobContext jobContext)
+          throws IOException {
+        while(!Thread.interrupted()){
+          try{
+            wait();
+          }catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+    assertJobState(job, JobStateInternal.REBOOT);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
+  public void testRebootedDuringCommit() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    syncBarrier.await();
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+    assertJobState(job, JobStateInternal.REBOOT);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
   public void testKilledDuringSetup() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);