You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/09/03 20:30:20 UTC

svn commit: r1519787 [1/2] - in /hadoop/common/branches/YARN-321/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/...

Author: vinodkv
Date: Tue Sep  3 18:30:05 2013
New Revision: 1519787

URL: http://svn.apache.org/r1519787
Log:
Forwarding YARN-321 branch to latest branch-2.

Modified:
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-examples/   (props changed)
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
    hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraScheduler.java

Propchange: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-2/hadoop-mapreduce-project:r1513206-1519783

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/CHANGES.txt?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/CHANGES.txt Tue Sep  3 18:30:05 2013
@@ -25,8 +25,7 @@ Release 2.3.0 - UNRELEASED
 
   OPTIMIZATIONS
 
-    MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
-    (Hairong Kuang and Jason Lowe via jlowe)
+    MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
 
   BUG FIXES
 
@@ -47,14 +46,20 @@ Release 2.1.1-beta - UNRELEASED
 
   IMPROVEMENTS
 
-  OPTIMIZATIONS
+    MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit
+    subclass (Sandy Ryza)
 
-    MAPREDUCE-5352. Optimize node local splits generated by
-    CombineFileInputFormat. (sseth)
+  OPTIMIZATIONS
 
     MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
     conditions (jlowe via kihwal)
 
+    MAPREDUCE-5462. In map-side sort, swap entire meta entries instead of
+    indexes for better cache performance. (Sandy Ryza)
+
+    MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
+    (Hairong Kuang and Jason Lowe via jlowe)
+
   BUG FIXES
 
     MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit
@@ -83,7 +88,33 @@ Release 2.1.1-beta - UNRELEASED
     MAPREDUCE-5425. Junit in TestJobHistoryServer failing in jdk 7 (Robert
     Parker via jlowe)
 
-Release 2.1.0-beta - 2013-08-06
+    MAPREDUCE-5454. TestDFSIO fails intermittently on JDK7 (Karthik Kambatla
+    via Sandy Ryza)
+
+    MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
+    failures (Sandy Ryza via jlowe)
+
+    MAPREDUCE-5466. Changed MR AM to not promote history files of intermediate
+    AMs in case they are exiting because of errors and thus help history-server
+    pick up the right history file for the last successful AM. (Jian He via
+    vinodkv)
+
+    MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via
+    acmurthy)
+
+    MAPREDUCE-5470. LocalJobRunner does not work on Windows. (Sandy Ryza via
+    cnauroth)
+
+    MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory
+    only after unregistering from the RM. (Jian He via vinodkv)
+
+    MAPREDUCE-5483. revert MAPREDUCE-5357. (rkanter via tucu)
+
+    MAPREDUCE-5441. Changed MR AM to return RUNNING state if exiting when RM
+    commands to reboot, so that client can continue to track the overall job.
+    (Jian He via vinodkv)
+
+Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES
 
@@ -255,6 +286,9 @@ Release 2.1.0-beta - 2013-08-06
     MAPREDUCE-5268. Improve history server startup performance (Karthik
     Kambatla via jlowe)
 
+    MAPREDUCE-5352. Optimize node local splits generated by
+    CombineFileInputFormat. (sseth)
+
   BUG FIXES
 
     MAPREDUCE-4671. AM does not tell the RM about container requests which are
@@ -1167,6 +1201,9 @@ Release 0.23.10 - UNRELEASED
 
     MAPREDUCE-5440. TestCopyCommitter Fails on JDK7 (Robert Parker via jlowe)
 
+    MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
+    failures (Sandy Ryza via jlowe)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt:r1513206-1519783

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Tue Sep  3 18:30:05 2013
@@ -75,9 +75,9 @@ class YarnChild {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     LOG.debug("Child starting");
 
-    final JobConf defaultConf = new JobConf();
-    defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
-    UserGroupInformation.setConfiguration(defaultConf);
+    final JobConf job = new JobConf();
+    job.addResource(MRJobConfig.JOB_CONF_FILE);
+    UserGroupInformation.setConfiguration(job);
 
     String host = args[0];
     int port = Integer.parseInt(args[1]);
@@ -111,7 +111,7 @@ class YarnChild {
       @Override
       public TaskUmbilicalProtocol run() throws Exception {
         return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
-            TaskUmbilicalProtocol.versionID, address, defaultConf);
+            TaskUmbilicalProtocol.versionID, address, job);
       }
     });
 
@@ -140,7 +140,7 @@ class YarnChild {
       YarnChild.taskid = task.getTaskID();
 
       // Create the job-conf and set credentials
-      final JobConf job = configureTask(task, credentials, jt);
+      configureTask(job, task, credentials, jt);
 
       // Initiate Java VM metrics
       JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId());
@@ -254,11 +254,10 @@ class YarnChild {
     job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
   }
 
-  private static JobConf configureTask(Task task, Credentials credentials,
-      Token<JobTokenIdentifier> jt) throws IOException {
-    final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
+  private static void configureTask(JobConf job, Task task,
+      Credentials credentials, Token<JobTokenIdentifier> jt) throws IOException {
     job.setCredentials(credentials);
-
+    
     ApplicationAttemptId appAttemptId =
         ConverterUtils.toContainerId(
             System.getenv(Environment.CONTAINER_ID.name()))
@@ -300,7 +299,6 @@ class YarnChild {
     writeLocalJobFile(localTaskFile, job);
     task.setJobFile(localTaskFile.toString());
     task.setConf(job);
-    return job;
   }
 
   /**

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Sep  3 18:30:05 2013
@@ -520,7 +520,7 @@ public class JobHistoryEventHandler exte
         mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
         mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
       }
-     
+
       // If this is JobFinishedEvent, close the writer and setup the job-index
       if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
         try {
@@ -532,6 +532,24 @@ public class JobHistoryEventHandler exte
               jFinishedEvent.getFinishedReduces());
           mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
           closeEventWriter(event.getJobID());
+          processDoneFiles(event.getJobID());
+        } catch (IOException e) {
+          throw new YarnRuntimeException(e);
+        }
+      }
+      // In case of JOB_ERROR, only process all the Done files(e.g. job
+      // summary, job history file etc.) if it is last AM retry.
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_ERROR) {
+        try {
+          JobUnsuccessfulCompletionEvent jucEvent =
+              (JobUnsuccessfulCompletionEvent) event.getHistoryEvent();
+          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
+          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
+          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
+          mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
+          closeEventWriter(event.getJobID());
+          if(context.isLastAMRetry())
+            processDoneFiles(event.getJobID());
         } catch (IOException e) {
           throw new YarnRuntimeException(e);
         }
@@ -548,6 +566,7 @@ public class JobHistoryEventHandler exte
           mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
           mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
           closeEventWriter(event.getJobID());
+          processDoneFiles(event.getJobID());
         } catch (IOException e) {
           throw new YarnRuntimeException(e);
         }
@@ -634,7 +653,6 @@ public class JobHistoryEventHandler exte
   }
 
   protected void closeEventWriter(JobId jobId) throws IOException {
-
     final MetaInfo mi = fileMap.get(jobId);
     if (mi == null) {
       throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
@@ -654,7 +672,15 @@ public class JobHistoryEventHandler exte
       LOG.error("Error closing writer for JobID: " + jobId);
       throw e;
     }
-     
+  }
+
+  protected void processDoneFiles(JobId jobId) throws IOException {
+
+    final MetaInfo mi = fileMap.get(jobId);
+    if (mi == null) {
+      throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
+    }
+
     if (mi.getHistoryFile() == null) {
       LOG.warn("No file for job-history with " + jobId + " found in cache!");
     }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java Tue Sep  3 18:30:05 2013
@@ -61,4 +61,6 @@ public interface AppContext {
   Set<String> getBlacklistedNodes();
   
   ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
+
+  boolean isLastAMRetry();
 }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Sep  3 18:30:05 2013
@@ -325,18 +325,23 @@ public class MRAppMaster extends Composi
         dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
             eater);
       }
-      
+
+      if (copyHistory) {
+        // Now that there's a FINISHING state for application on RM to give AMs
+        // plenty of time to clean up after unregister it's safe to clean staging
+        // directory after unregistering with RM. So, we start the staging-dir
+        // cleaner BEFORE the ContainerAllocator so that on shut-down,
+        // ContainerAllocator unregisters first and then the staging-dir cleaner
+        // deletes staging directory.
+        addService(createStagingDirCleaningService());
+      }
+
       // service to allocate containers from RM (if non-uber) or to fake it (uber)
       containerAllocator = createContainerAllocator(null, context);
       addIfService(containerAllocator);
       dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
 
       if (copyHistory) {
-        // Add the staging directory cleaner before the history server but after
-        // the container allocator so the staging directory is cleaned after
-        // the history has been flushed but before unregistering with the RM.
-        addService(createStagingDirCleaningService());
-
         // Add the JobHistoryEventHandler last so that it is properly stopped first.
         // This will guarantee that all history-events are flushed before AM goes
         // ahead with shutdown.
@@ -344,7 +349,6 @@ public class MRAppMaster extends Composi
         // component creates a JobHistoryEvent in the meanwhile, it will be just be
         // queued inside the JobHistoryEventHandler 
         addIfService(historyService);
-        
 
         JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
             dispatcher.getEventHandler());
@@ -396,6 +400,14 @@ public class MRAppMaster extends Composi
       dispatcher.register(Speculator.EventType.class,
           speculatorEventDispatcher);
 
+      // Now that there's a FINISHING state for application on RM to give AMs
+      // plenty of time to clean up after unregister it's safe to clean staging
+      // directory after unregistering with RM. So, we start the staging-dir
+      // cleaner BEFORE the ContainerAllocator so that on shut-down,
+      // ContainerAllocator unregisters first and then the staging-dir cleaner
+      // deletes staging directory.
+      addService(createStagingDirCleaningService());
+
       // service to allocate containers from RM (if non-uber) or to fake it (uber)
       addIfService(containerAllocator);
       dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
@@ -405,11 +417,6 @@ public class MRAppMaster extends Composi
       addIfService(containerLauncher);
       dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
 
-      // Add the staging directory cleaner before the history server but after
-      // the container allocator so the staging directory is cleaned after
-      // the history has been flushed but before unregistering with the RM.
-      addService(createStagingDirCleaningService());
-
       // Add the JobHistoryEventHandler last so that it is properly stopped first.
       // This will guarantee that all history-events are flushed before AM goes
       // ahead with shutdown.
@@ -619,12 +626,6 @@ public class MRAppMaster extends Composi
     }
   }
 
-  protected void addIfService(Object object) {
-    if (object instanceof Service) {
-      addService((Service) object);
-    }
-  }
-
   protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
       AppContext context) {
     this.jobHistoryEventHandler = new JobHistoryEventHandler(context,
@@ -952,6 +953,11 @@ public class MRAppMaster extends Composi
     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
       return clientToAMTokenSecretManager;
     }
+
+    @Override
+    public boolean isLastAMRetry(){
+      return isLastAMRetry;
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -1037,11 +1043,11 @@ public class MRAppMaster extends Composi
     // attempt will generate one.  However that disables recovery if there
     // are reducers as the shuffle secret would be app attempt specific.
     int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
-    boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
-        TokenCache.getShuffleSecretKey(jobCredentials) != null);
+    boolean shuffleKeyValidForRecovery =
+        TokenCache.getShuffleSecretKey(jobCredentials) != null;
 
     if (recoveryEnabled && recoverySupportedByCommitter
-          && shuffleKeyValidForRecovery) {
+        && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
       LOG.info("Recovery is enabled. "
           + "Will try to recover from previous life on best effort basis.");
       try {
@@ -1054,7 +1060,8 @@ public class MRAppMaster extends Composi
     } else {
       LOG.info("Will not try to recover. recoveryEnabled: "
             + recoveryEnabled + " recoverySupportedByCommitter: "
-            + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+            + recoverySupportedByCommitter + " numReduceTasks: "
+            + numReduceTasks + " shuffleKeyValidForRecovery: "
             + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
             + appAttemptID.getAttemptId());
       // Get the amInfos anyways whether recovery is enabled or not

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Sep  3 18:30:05 2013
@@ -993,7 +993,7 @@ public class JobImpl implements org.apac
     }
   }
   
-  private static JobState getExternalState(JobStateInternal smState) {
+  private JobState getExternalState(JobStateInternal smState) {
     switch (smState) {
     case KILL_WAIT:
     case KILL_ABORT:
@@ -1005,7 +1005,13 @@ public class JobImpl implements org.apac
     case FAIL_ABORT:
       return JobState.FAILED;
     case REBOOT:
-      return JobState.ERROR;
+      if (appContext.isLastAMRetry()) {
+        return JobState.ERROR;
+      } else {
+        // In case of not last retry, return the external state as RUNNING since
+        // otherwise JobClient will exit when it polls the AM for job state
+        return JobState.RUNNING;
+      }
     default:
       return JobState.valueOf(smState.name());
     }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Sep  3 18:30:05 2013
@@ -36,9 +36,12 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -223,6 +226,7 @@ public abstract class RMCommunicator ext
 
   protected void startAllocatorThread() {
     allocatorThread = new Thread(new Runnable() {
+      @SuppressWarnings("unchecked")
       @Override
       public void run() {
         while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
@@ -233,6 +237,15 @@ public abstract class RMCommunicator ext
             } catch (YarnRuntimeException e) {
               LOG.error("Error communicating with RM: " + e.getMessage() , e);
               return;
+            } catch (InvalidToken e) {
+              // This can happen if the RM has been restarted, since currently
+              // when RM restarts AMRMToken is not populated back to
+              // AMRMTokenSecretManager yet. Once this is fixed, no need
+              // to send JOB_AM_REBOOT event in this method any more.
+              eventHandler.handle(new JobEvent(job.getID(),
+                JobEventType.JOB_AM_REBOOT));
+              LOG.error("Error in authencating with RM: " ,e);
+              return;
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
               continue;

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Tue Sep  3 18:30:05 2013
@@ -25,10 +25,13 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
 
 import java.io.File;
 import java.io.IOException;
 
+import junit.framework.Assert;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +46,7 @@ import org.apache.hadoop.mapreduce.TypeC
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -229,6 +233,98 @@ public class TestJobHistoryEventHandler 
     }
   }
 
+  // In case of all types of events, process Done files if it's last AM retry
+  @Test (timeout=50000)
+  public void testProcessDoneFilesOnLastAMRetry() throws Exception {
+    TestParams t = new TestParams(true);
+    Configuration conf = new Configuration();
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+        t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      verify(jheh, times(0)).processDoneFiles(any(JobId.class));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.ERROR.toString())));
+      verify(jheh, times(1)).processDoneFiles(any(JobId.class));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+        TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
+        new Counters(), new Counters())));
+      verify(jheh, times(2)).processDoneFiles(any(JobId.class));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.FAILED.toString())));
+      verify(jheh, times(3)).processDoneFiles(any(JobId.class));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.KILLED.toString())));
+      verify(jheh, times(4)).processDoneFiles(any(JobId.class));
+
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter, times(5)).write(any(HistoryEvent.class));
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+
+  // Skip processing Done files in case of ERROR, if it's not last AM retry
+  @Test (timeout=50000)
+  public void testProcessDoneFilesNotLastAMRetry() throws Exception {
+    TestParams t = new TestParams(false);
+    Configuration conf = new Configuration();
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+        t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      verify(jheh, times(0)).processDoneFiles(t.jobId);
+
+      // skip processing done files
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.ERROR.toString())));
+      verify(jheh, times(0)).processDoneFiles(t.jobId);
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+          TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
+          new Counters(), new Counters())));
+      verify(jheh, times(1)).processDoneFiles(t.jobId);
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.FAILED.toString())));
+      verify(jheh, times(2)).processDoneFiles(t.jobId);
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.KILLED.toString())));
+      verify(jheh, times(3)).processDoneFiles(t.jobId);
+
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter, times(5)).write(any(HistoryEvent.class));
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+
   private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
     jheh.handle(event);
   }
@@ -258,20 +354,23 @@ public class TestJobHistoryEventHandler 
     }
   }
 
-  private AppContext mockAppContext(ApplicationId appId) {
+  private AppContext mockAppContext(ApplicationId appId, boolean isLastAMRetry) {
     JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
     AppContext mockContext = mock(AppContext.class);
     Job mockJob = mock(Job.class);
+    when(mockJob.getAllCounters()).thenReturn(new Counters());
     when(mockJob.getTotalMaps()).thenReturn(10);
     when(mockJob.getTotalReduces()).thenReturn(10);
     when(mockJob.getName()).thenReturn("mockjob");
     when(mockContext.getJob(jobId)).thenReturn(mockJob);
     when(mockContext.getApplicationID()).thenReturn(appId);
+    when(mockContext.isLastAMRetry()).thenReturn(isLastAMRetry);
     return mockContext;
   }
 
 
   private class TestParams {
+    boolean isLastAMRetry;
     String workDir = setupTestWorkDir();
     ApplicationId appId = ApplicationId.newInstance(200, 1);
     ApplicationAttemptId appAttemptId =
@@ -279,7 +378,15 @@ public class TestJobHistoryEventHandler 
     ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
     TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-    AppContext mockAppContext = mockAppContext(appId);
+    AppContext mockAppContext;
+
+    public TestParams() {
+      this(false);
+    }
+    public TestParams(boolean isLastAMRetry) {
+      this.isLastAMRetry = isLastAMRetry;
+      mockAppContext = mockAppContext(appId, this.isLastAMRetry);
+    }
   }
 
   private JobHistoryEvent getEventToEnqueue(JobId jobId) {
@@ -344,7 +451,6 @@ public class TestJobHistoryEventHandler 
 class JHEvenHandlerForTest extends JobHistoryEventHandler {
 
   private EventWriter eventWriter;
-
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
   }
@@ -367,6 +473,11 @@ class JHEvenHandlerForTest extends JobHi
   public EventWriter getEventWriter() {
     return this.eventWriter;
   }
+
+  @Override
+  protected void processDoneFiles(JobId jobId){
+    // do nothing
+  }
 }
 
 /**

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java Tue Sep  3 18:30:05 2013
@@ -130,4 +130,9 @@ public class MockAppContext implements A
     // Not implemented
     return null;
   }
+
+  @Override
+  public boolean isLastAMRetry() {
+    return false;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue Sep  3 18:30:05 2013
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -41,6 +42,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -51,12 +54,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Clock;
 import org.junit.Test;
 
 /**
@@ -368,6 +374,47 @@ public class TestMRApp {
     app.waitForState(job, JobState.ERROR);
   }
 
+  @Test
+  public void testJobRebootNotLastRetry() throws Exception {
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+
+    //send an reboot event
+    app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
+      JobEventType.JOB_AM_REBOOT));
+
+    // return exteranl state as RUNNING since otherwise the JobClient will
+    // prematurely exit.
+    app.waitForState(job, JobState.RUNNING);
+  }
+
+  @Test
+  public void testJobRebootOnLastRetry() throws Exception {
+    // make startCount as 2 since this is last retry which equals to
+    // DEFAULT_MAX_AM_RETRY
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2);
+
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+
+    //send an reboot event
+    app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
+      JobEventType.JOB_AM_REBOOT));
+
+    // return exteranl state as ERROR if this is the last retry
+    app.waitForState(job, JobState.ERROR);
+  }
+
   private final class MRAppWithSpiedJob extends MRApp {
     private JobImpl spiedJob;
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Sep  3 18:30:05 2013
@@ -114,7 +114,6 @@ public class TestRecovery {
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
-
   /**
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * completely disappears because of failed launch, one attempt gets killed and
@@ -316,6 +315,116 @@ public class TestRecovery {
     // available in the failed attempt should be available here
   }
 
+  /**
+   * AM with 3 maps and 0 reduce. AM crashes after the first two tasks finishes
+   * and recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testCrashOfMapsOnlyJob() throws Exception {
+    int runCount = 0;
+    MRApp app =
+        new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
+          ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    // all maps would be running
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task mapTask3 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt =
+        mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt =
+        mapTask2.getAttempts().values().iterator().next();
+    TaskAttempt task3Attempt =
+        mapTask3.getAttempts().values().iterator().next();
+
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 1st two maps
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait for first two map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // stop the app
+    app.stop();
+
+    // rerun
+    // in rerun the 1st two map will be recovered from previous run
+    app =
+        new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    // Set num-reduces explicitly in conf as recovery logic depends on it.
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    mapTask3 = it.next();
+
+    // first two maps will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    app.waitForState(mapTask3, TaskState.RUNNING);
+
+    task3Attempt = mapTask3.getAttempts().values().iterator().next();
+    // before sending the TA_DONE, event make sure attempt has come to
+    // RUNNING state
+    app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
+
+    // send the done signal to the 3rd map task
+    app
+      .getContext()
+      .getEventHandler()
+      .handle(
+        new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
+          .getID(), TaskAttemptEventType.TA_DONE));
+
+    // wait to get it completed
+    app.waitForState(mapTask3, TaskState.SUCCEEDED);
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
   @Test
   public void testMultipleCrashes() throws Exception {
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Tue Sep  3 18:30:05 2013
@@ -862,5 +862,10 @@ public class TestRuntimeEstimators {
     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
       return null;
     }
+
+    @Override
+    public boolean isLastAMRetry() {
+      return false;
+    }
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Tue Sep  3 18:30:05 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -54,6 +56,7 @@ import org.apache.hadoop.service.Service
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -279,14 +282,17 @@ import org.junit.Test;
    }
 
   private final class MRAppTestCleanup extends MRApp {
-    boolean stoppedContainerAllocator;
-    boolean cleanedBeforeContainerAllocatorStopped;
-
+    int stagingDirCleanedup;
+    int ContainerAllocatorStopped;
+    int JobHistoryEventHandlerStopped;
+    int numStops;
     public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
         String testName, boolean cleanOnStart) {
       super(maps, reduces, autoComplete, testName, cleanOnStart);
-      stoppedContainerAllocator = false;
-      cleanedBeforeContainerAllocatorStopped = false;
+      stagingDirCleanedup = 0;
+      ContainerAllocatorStopped = 0;
+      JobHistoryEventHandlerStopped = 0;
+      numStops = 0;
     }
 
     @Override
@@ -313,6 +319,26 @@ import org.junit.Test;
     }
 
     @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      return new TestJobHistoryEventHandler(context, getStartCount());
+    }
+
+    private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
+
+      public TestJobHistoryEventHandler(AppContext context, int startCount) {
+        super(context, startCount);
+      }
+
+      @Override
+      public void serviceStop() throws Exception {
+        numStops++;
+        JobHistoryEventHandlerStopped = numStops;
+        super.serviceStop();
+      }
+    }
+
+    @Override
     protected ContainerAllocator createContainerAllocator(
         ClientService clientService, AppContext context) {
       return new TestCleanupContainerAllocator();
@@ -334,7 +360,8 @@ import org.junit.Test;
 
       @Override
       protected void serviceStop() throws Exception {
-        stoppedContainerAllocator = true;
+        numStops++;
+        ContainerAllocatorStopped = numStops;
         super.serviceStop();
       }
     }
@@ -346,7 +373,8 @@ import org.junit.Test;
 
     @Override
     public void cleanupStagingDir() throws IOException {
-      cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
+      numStops++;
+      stagingDirCleanedup = numStops;
     }
 
     @Override
@@ -377,11 +405,15 @@ import org.junit.Test;
     app.verifyCompleted();
 
     int waitTime = 20 * 1000;
-    while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
+    while (waitTime > 0 && app.numStops < 3 ) {
       Thread.sleep(100);
       waitTime -= 100;
     }
-    Assert.assertTrue("Staging directory not cleaned before notifying RM",
-        app.cleanedBeforeContainerAllocatorStopped);
+
+    // assert JobHistoryEventHandlerStopped first, then
+    // ContainerAllocatorStopped, and then stagingDirCleanedup
+    Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
+    Assert.assertEquals(2, app.ContainerAllocatorStopped);
+    Assert.assertEquals(3, app.stagingDirCleanedup);
   }
  }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Sep  3 18:30:05 2013
@@ -142,7 +142,7 @@ public class TestJobImpl {
         "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
         "tag1,tag2");
     dispatcher.register(EventType.class, jseHandler);
-    JobImpl job = createStubbedJob(conf, dispatcher, 0);
+    JobImpl job = createStubbedJob(conf, dispatcher, 0, null);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
     job.handle(new JobStartEvent(job.getID()));
@@ -170,7 +170,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
@@ -195,7 +195,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
@@ -239,7 +239,9 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    AppContext mockContext = mock(AppContext.class);
+    when(mockContext.isLastAMRetry()).thenReturn(false);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -248,6 +250,10 @@ public class TestJobImpl {
 
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
     assertJobState(job, JobStateInternal.REBOOT);
+    // return the external state as RUNNING since otherwise JobClient will
+    // exit when it polls the AM for job state
+    Assert.assertEquals(JobState.RUNNING, job.getState());
+
     dispatcher.stop();
     commitHandler.stop();
   }
@@ -256,6 +262,7 @@ public class TestJobImpl {
   public void testRebootedDuringCommit() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -266,13 +273,18 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    AppContext mockContext = mock(AppContext.class);
+    when(mockContext.isLastAMRetry()).thenReturn(true);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
     syncBarrier.await();
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
     assertJobState(job, JobStateInternal.REBOOT);
+    // return the external state as FAILED since this is last retry.
+    Assert.assertEquals(JobState.ERROR, job.getState());
+
     dispatcher.stop();
     commitHandler.stop();
   }
@@ -301,7 +313,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -328,7 +340,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
@@ -352,7 +364,7 @@ public class TestJobImpl {
         createCommitterEventHandler(dispatcher, committer);
     commitHandler.init(conf);
     commitHandler.start();
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
 
     //Fail one task. This should land the JobImpl in the FAIL_WAIT state
     job.handle(new JobTaskEvent(
@@ -388,7 +400,7 @@ public class TestJobImpl {
     //Job has only 1 mapper task. No reducers
     conf.setInt(MRJobConfig.NUM_REDUCES, 0);
     conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 1);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null);
 
     //Fail / finish all the tasks. This should land the JobImpl directly in the
     //FAIL_ABORT state
@@ -440,7 +452,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -477,7 +489,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -687,7 +699,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -735,12 +747,12 @@ public class TestJobImpl {
   }
 
   private static StubbedJob createStubbedJob(Configuration conf,
-      Dispatcher dispatcher, int numSplits) {
+      Dispatcher dispatcher, int numSplits, AppContext appContext) {
     JobID jobID = JobID.forName("job_1234567890000_0001");
     JobId jobId = TypeConverter.toYarn(jobID);
     StubbedJob job = new StubbedJob(jobId,
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
-        conf,dispatcher.getEventHandler(), true, "somebody", numSplits);
+        conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);
     dispatcher.register(JobEventType.class, job);
     EventHandler mockHandler = mock(EventHandler.class);
     dispatcher.register(TaskEventType.class, mockHandler);
@@ -751,8 +763,8 @@ public class TestJobImpl {
   }
 
   private static StubbedJob createRunningStubbedJob(Configuration conf,
-      Dispatcher dispatcher, int numSplits) {
-    StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
+      Dispatcher dispatcher, int numSplits, AppContext appContext) {
+    StubbedJob job = createStubbedJob(conf, dispatcher, numSplits, appContext);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
     job.handle(new JobStartEvent(job.getID()));
@@ -880,13 +892,13 @@ public class TestJobImpl {
     }
 
     public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
-        Configuration conf, EventHandler eventHandler,
-        boolean newApiCommitter, String user, int numSplits) {
+        Configuration conf, EventHandler eventHandler, boolean newApiCommitter,
+        String user, int numSplits, AppContext appContext) {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
           new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
           MRAppMetrics.create(), null, newApiCommitter, user,
-          System.currentTimeMillis(), null, null, null, null);
+          System.currentTimeMillis(), null, appContext, null, null);
 
       initTransition = getInitTransition(numSplits);
       localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Tue Sep  3 18:30:05 2013
@@ -64,6 +64,8 @@
               <goal>protoc</goal>
             </goals>
             <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
               <imports>
                 <param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
                 <param>${basedir}/../../../hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto</param>

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Sep  3 18:30:05 2013
@@ -227,7 +227,7 @@ public class LocalJobRunner implements C
             info.getSplitIndex(), 1);
           map.setUser(UserGroupInformation.getCurrentUser().
               getShortUserName());
-          setupChildMapredLocalDirs(localJobDir, map, localConf);
+          setupChildMapredLocalDirs(map, localConf);
 
           MapOutputFile mapOutput = new MROutputFiles();
           mapOutput.setConf(localConf);
@@ -305,7 +305,7 @@ public class LocalJobRunner implements C
               reduceId, taskId, mapIds.size(), 1);
           reduce.setUser(UserGroupInformation.getCurrentUser().
               getShortUserName());
-          setupChildMapredLocalDirs(localJobDir, reduce, localConf);
+          setupChildMapredLocalDirs(reduce, localConf);
           reduce.setLocalMapFiles(mapOutputFiles);
 
           if (!Job.this.isInterrupted()) {
@@ -958,16 +958,18 @@ public class LocalJobRunner implements C
     throw new UnsupportedOperationException("Not supported");
   }
   
-  static void setupChildMapredLocalDirs(Path localJobDir, Task t, JobConf conf) {
+  static void setupChildMapredLocalDirs(Task t, JobConf conf) {
     String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
+    String jobId = t.getJobID().toString();
     String taskId = t.getTaskID().toString();
     boolean isCleanup = t.isTaskCleanupTask();
+    String user = t.getUser();
     StringBuffer childMapredLocalDir =
         new StringBuffer(localDirs[0] + Path.SEPARATOR
-            + getLocalTaskDir(localJobDir, taskId, isCleanup));
+            + getLocalTaskDir(user, jobId, taskId, isCleanup));
     for (int i = 1; i < localDirs.length; i++) {
       childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
-          + getLocalTaskDir(localJobDir, taskId, isCleanup));
+          + getLocalTaskDir(user, jobId, taskId, isCleanup));
     }
     LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
     conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
@@ -976,9 +978,10 @@ public class LocalJobRunner implements C
   static final String TASK_CLEANUP_SUFFIX = ".cleanup";
   static final String JOBCACHE = "jobcache";
   
-  static String getLocalTaskDir(Path localJobDir, String taskid,
+  static String getLocalTaskDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    String taskDir = localJobDir.toString() + Path.SEPARATOR + taskid;
+    String taskDir = jobDir + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
+      + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
     if (isCleanupAttempt) {
       taskDir = taskDir + TASK_CLEANUP_SUFFIX;
     }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Tue Sep  3 18:30:05 2013
@@ -129,6 +129,15 @@ public class JHAdminConfig {
   public static final int DEFAULT_MR_HISTORY_WEBAPP_PORT = 19888;
   public static final String DEFAULT_MR_HISTORY_WEBAPP_ADDRESS =
     "0.0.0.0:" + DEFAULT_MR_HISTORY_WEBAPP_PORT;
+  
+  /**The kerberos principal to be used for spnego filter for history server*/
+  public static final String MR_WEBAPP_SPNEGO_USER_NAME_KEY =
+      MR_HISTORY_PREFIX + "webapp.spnego-principal";
+  
+  /** The kerberos keytab to be used for spnego filter for history server*/
+  public static final String MR_WEBAPP_SPNEGO_KEYTAB_FILE_KEY =
+      MR_HISTORY_PREFIX + "webapp.spnego-keytab-file";
+
   /*
    * HS Service Authorization
    */

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Tue Sep  3 18:30:05 2013
@@ -269,6 +269,7 @@
           "JOB_STATUS_CHANGED",
           "JOB_FAILED",
           "JOB_KILLED",
+          "JOB_ERROR",
           "JOB_INFO_CHANGED",
           "TASK_STARTED",
           "TASK_FINISHED",

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Tue Sep  3 18:30:05 2013
@@ -884,10 +884,10 @@ public class MapTask extends Task {
     byte[] kvbuffer;        // main output buffer
     private final byte[] b0 = new byte[0];
 
-    private static final int INDEX = 0;            // index offset in acct
-    private static final int VALSTART = 1;         // val offset in acct
-    private static final int KEYSTART = 2;         // key offset in acct
-    private static final int PARTITION = 3;        // partition offset in acct
+    private static final int VALSTART = 0;         // val offset in acct
+    private static final int KEYSTART = 1;         // key offset in acct
+    private static final int PARTITION = 2;        // partition offset in acct
+    private static final int VALLEN = 3;           // length of value
     private static final int NMETA = 4;            // num meta ints
     private static final int METASIZE = NMETA * 4; // size in bytes
 
@@ -1151,10 +1151,10 @@ public class MapTask extends Task {
             distanceTo(keystart, valend, bufvoid));
 
         // write accounting info
-        kvmeta.put(kvindex + INDEX, kvindex);
         kvmeta.put(kvindex + PARTITION, partition);
         kvmeta.put(kvindex + KEYSTART, keystart);
         kvmeta.put(kvindex + VALSTART, valstart);
+        kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
         // advance kvindex
         kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
       } catch (MapBufferTooSmallException e) {
@@ -1224,17 +1224,11 @@ public class MapTask extends Task {
     }
 
     /**
-     * For the given meta position, return the dereferenced position in the
-     * integer array. Each meta block contains several integers describing
-     * record data in its serialized form, but the INDEX is not necessarily
-     * related to the proximate metadata. The index value at the referenced int
-     * position is the start offset of the associated metadata block. So the
-     * metadata INDEX at metapos may point to the metadata described by the
-     * metadata block at metapos + k, which contains information about that
-     * serialized record.
+     * For the given meta position, return the offset into the int-sized
+     * kvmeta buffer.
      */
     int offsetFor(int metapos) {
-      return kvmeta.get(metapos * NMETA + INDEX);
+      return metapos * NMETA;
     }
 
     /**
@@ -1260,16 +1254,17 @@ public class MapTask extends Task {
           kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
     }
 
+    final byte META_BUFFER_TMP[] = new byte[METASIZE];
     /**
-     * Swap logical indices st i, j MOD offset capacity.
+     * Swap metadata for items i, j
      * @see IndexedSortable#swap
      */
     public void swap(final int mi, final int mj) {
-      final int kvi = (mi % maxRec) * NMETA + INDEX;
-      final int kvj = (mj % maxRec) * NMETA + INDEX;
-      int tmp = kvmeta.get(kvi);
-      kvmeta.put(kvi, kvmeta.get(kvj));
-      kvmeta.put(kvj, tmp);
+      int iOff = (mi % maxRec) * METASIZE;
+      int jOff = (mj % maxRec) * METASIZE;
+      System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE);
+      System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE);
+      System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE);
     }
 
     /**
@@ -1601,9 +1596,9 @@ public class MapTask extends Task {
               while (spindex < mend &&
                   kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                 final int kvoff = offsetFor(spindex % maxRec);
-                key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
-                          (kvmeta.get(kvoff + VALSTART) -
-                           kvmeta.get(kvoff + KEYSTART)));
+                int keystart = kvmeta.get(kvoff + KEYSTART);
+                int valstart = kvmeta.get(kvoff + VALSTART);
+                key.reset(kvbuffer, keystart, valstart - keystart);
                 getVBytesForOffset(kvoff, value);
                 writer.append(key, value);
                 ++spindex;
@@ -1729,14 +1724,8 @@ public class MapTask extends Task {
     private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
       // get the keystart for the next serialized value to be the end
       // of this value. If this is the last value in the buffer, use bufend
-      final int nextindex = kvoff == kvend
-        ? bufend
-        : kvmeta.get(
-            (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
-      // calculate the length of the value
-      int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
-        ? nextindex - kvmeta.get(kvoff + VALSTART)
-        : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
+      final int vallen = kvmeta.get(kvoff + VALLEN);
+      assert vallen >= 0;
       vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
     }
 

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java Tue Sep  3 18:30:05 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
@@ -181,7 +182,18 @@ public class Cluster {
   public Job getJob(JobID jobId) throws IOException, InterruptedException {
     JobStatus status = client.getJobStatus(jobId);
     if (status != null) {
-      return Job.getInstance(this, status, new JobConf(status.getJobFile()));
+      JobConf conf;
+      try {
+        conf = new JobConf(status.getJobFile());
+      } catch (RuntimeException ex) {
+        // If job file doesn't exist it means we can't find the job
+        if (ex.getCause() instanceof FileNotFoundException) {
+          return null;
+        } else {
+          throw ex;
+        }
+      }
+      return Job.getInstance(this, status, conf);
     }
     return null;
   }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Tue Sep  3 18:30:05 2013
@@ -124,7 +124,6 @@ public class JobSubmissionFiles {
     } else {
       fs.mkdirs(stagingArea, 
           new FsPermission(JOB_DIR_PERMISSION));
-      fs.setOwner(stagingArea, currentUser, null);
     }
     return stagingArea;
   }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Tue Sep  3 18:30:05 2013
@@ -104,6 +104,8 @@ public class EventReader implements Clos
       result = new JobUnsuccessfulCompletionEvent(); break;
     case JOB_KILLED:
       result = new JobUnsuccessfulCompletionEvent(); break;
+    case JOB_ERROR:
+      result = new JobUnsuccessfulCompletionEvent(); break;
     case JOB_INFO_CHANGED:
       result = new JobInfoChangeEvent(); break;
     case TASK_STARTED:

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Sep  3 18:30:05 2013
@@ -185,6 +185,7 @@ public class JobHistoryParser implements
       break;
     case JOB_FAILED:
     case JOB_KILLED:
+    case JOB_ERROR:
       handleJobFailedEvent((JobUnsuccessfulCompletionEvent) event);
       break;
     case JOB_FINISHED:

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java Tue Sep  3 18:30:05 2013
@@ -72,6 +72,8 @@ public class JobUnsuccessfulCompletionEv
   public EventType getEventType() {
     if ("FAILED".equals(getStatus())) {
       return EventType.JOB_FAILED;
+    } else if ("ERROR".equals(getStatus())) {
+      return EventType.JOB_ERROR;
     } else
       return EventType.JOB_KILLED;
   }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java Tue Sep  3 18:30:05 2013
@@ -102,7 +102,8 @@ public class TestJobEndNotifier extends 
   public void setUp() throws Exception {
     new File(System.getProperty("build.webapps", "build/webapps") + "/test"
         ).mkdirs();
-    server = new HttpServer("test", "0.0.0.0", 0, true);
+    server = new HttpServer.Builder().setName("test")
+        .setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build();
     server.addServlet("delay", "/delay", DelayServlet.class);
     server.addServlet("jobend", "/jobend", JobEndServlet.class);
     server.addServlet("fail", "/fail", FailServlet.class);

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml Tue Sep  3 18:30:05 2013
@@ -78,6 +78,8 @@
 				<goal>protoc</goal>
 				</goals>
 				<configuration>
+        <protocVersion>${protobuf.version}</protocVersion>
+        <protocCommand>${protoc.path}</protocCommand>
 				<imports>
 					<param>
 						${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Tue Sep  3 18:30:05 2013
@@ -80,6 +80,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -148,8 +149,14 @@ public class HistoryClientService extend
         JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS,
         JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT);
     // NOTE: there should be a .at(InetSocketAddress)
-    WebApps.$for("jobhistory", HistoryClientService.class, this, "ws")
-        .with(conf).at(NetUtils.getHostPortString(bindAddress)).start(webApp);
+    WebApps
+        .$for("jobhistory", HistoryClientService.class, this, "ws")
+        .with(conf)
+        .withHttpSpnegoKeytabKey(
+            JHAdminConfig.MR_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
+        .withHttpSpnegoPrincipalKey(
+            JHAdminConfig.MR_WEBAPP_SPNEGO_USER_NAME_KEY)
+        .at(NetUtils.getHostPortString(bindAddress)).start(webApp);
     conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
                            webApp.getListenerAddress());
   }

Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1519787&r1=1519786&r2=1519787&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue Sep  3 18:30:05 2013
@@ -381,4 +381,10 @@ public class JobHistory extends Abstract
     // Not implemented.
     return null;
   }
+
+  @Override
+  public boolean isLastAMRetry() {
+    // bogus - Not Required
+    return false;
+  }
 }