You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:52 UTC

svn commit: r1619012 [2/7] - in /hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Tue Aug 19 23:49:39 2014
@@ -24,8 +24,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
@@ -51,11 +57,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Runs the container task locally in a thread.
  * Since all (sub)tasks share the same local directory, they must be executed
@@ -71,7 +80,8 @@ public class LocalContainerLauncher exte
   private final HashSet<File> localizedFiles;
   private final AppContext context;
   private final TaskUmbilicalProtocol umbilical;
-  private Thread eventHandlingThread;
+  private ExecutorService taskRunner;
+  private Thread eventHandler;
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
 
@@ -115,14 +125,24 @@ public class LocalContainerLauncher exte
   }
 
   public void serviceStart() throws Exception {
-    eventHandlingThread = new Thread(new SubtaskRunner(), "uber-SubtaskRunner");
-    eventHandlingThread.start();
+    // create a single thread for serial execution of tasks
+    // make it a daemon thread so that the process can exit even if the task is
+    // not interruptible
+    taskRunner =
+        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
+            setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
+    // create and start an event handling thread
+    eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
+    eventHandler.start();
     super.serviceStart();
   }
 
   public void serviceStop() throws Exception {
-    if (eventHandlingThread != null) {
-      eventHandlingThread.interrupt();
+    if (eventHandler != null) {
+      eventHandler.interrupt();
+    }
+    if (taskRunner != null) {
+      taskRunner.shutdownNow();
     }
     super.serviceStop();
   }
@@ -158,12 +178,17 @@ public class LocalContainerLauncher exte
    *   - runs Task (runSubMap() or runSubReduce())
    *     - TA can safely send TA_UPDATE since in RUNNING state
    */
-  private class SubtaskRunner implements Runnable {
+  private class EventHandler implements Runnable {
 
+    // doneWithMaps and finishedSubMaps are accessed from only
+    // one thread. Therefore, no need to make them volatile.
     private boolean doneWithMaps = false;
     private int finishedSubMaps = 0;
 
-    SubtaskRunner() {
+    private final Map<TaskAttemptId,Future<?>> futures =
+        new ConcurrentHashMap<TaskAttemptId,Future<?>>();
+
+    EventHandler() {
     }
 
     @SuppressWarnings("unchecked")
@@ -172,7 +197,7 @@ public class LocalContainerLauncher exte
       ContainerLauncherEvent event = null;
 
       // Collect locations of map outputs to give to reduces
-      Map<TaskAttemptID, MapOutputFile> localMapFiles =
+      final Map<TaskAttemptID, MapOutputFile> localMapFiles =
           new HashMap<TaskAttemptID, MapOutputFile>();
       
       // _must_ either run subtasks sequentially or accept expense of new JVMs
@@ -183,81 +208,41 @@ public class LocalContainerLauncher exte
           event = eventQueue.take();
         } catch (InterruptedException e) {  // mostly via T_KILL? JOB_KILL?
           LOG.error("Returning, interrupted : " + e);
-          return;
+          break;
         }
 
         LOG.info("Processing the event " + event.toString());
 
         if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
 
-          ContainerRemoteLaunchEvent launchEv =
+          final ContainerRemoteLaunchEvent launchEv =
               (ContainerRemoteLaunchEvent)event;
-          TaskAttemptId attemptID = launchEv.getTaskAttemptID(); 
-
-          Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
-          int numMapTasks = job.getTotalMaps();
-          int numReduceTasks = job.getTotalReduces();
-
-          // YARN (tracking) Task:
-          org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
-              job.getTask(attemptID.getTaskId());
-          // classic mapred Task:
-          org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
-
-          // after "launching," send launched event to task attempt to move
-          // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
-          // do getRemoteTask() call first)
           
-          //There is no port number because we are not really talking to a task
-          // tracker.  The shuffle is just done through local files.  So the
-          // port number is set to -1 in this case.
-          context.getEventHandler().handle(
-              new TaskAttemptContainerLaunchedEvent(attemptID, -1));
-
-          if (numMapTasks == 0) {
-            doneWithMaps = true;
-          }
-
-          try {
-            if (remoteTask.isMapOrReduce()) {
-              JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
-              jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
-              if (remoteTask.isMapTask()) {
-                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
-              } else {
-                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
-              }
-              context.getEventHandler().handle(jce);
+          // execute the task on a separate thread
+          Future<?> future = taskRunner.submit(new Runnable() {
+            public void run() {
+              runTask(launchEv, localMapFiles);
             }
-            runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
-                       (numReduceTasks > 0), localMapFiles);
-            
-          } catch (RuntimeException re) {
-            JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
-            jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
-            context.getEventHandler().handle(jce);
-            // this is our signal that the subtask failed in some way, so
-            // simulate a failed JVM/container and send a container-completed
-            // event to task attempt (i.e., move state machine from RUNNING
-            // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
-            context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
-                TaskAttemptEventType.TA_CONTAINER_COMPLETED));
-          } catch (IOException ioe) {
-            // if umbilical itself barfs (in error-handler of runSubMap()),
-            // we're pretty much hosed, so do what YarnChild main() does
-            // (i.e., exit clumsily--but can never happen, so no worries!)
-            LOG.fatal("oopsie...  this can never happen: "
-                + StringUtils.stringifyException(ioe));
-            System.exit(-1);
-          }
+          });
+          // remember the current attempt
+          futures.put(event.getTaskAttemptID(), future);
 
         } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
 
-          // no container to kill, so just send "cleaned" event to task attempt
-          // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state
-          // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
+          // cancel (and interrupt) the current running task associated with the
+          // event
+          TaskAttemptId taId = event.getTaskAttemptID();
+          Future<?> future = futures.remove(taId);
+          if (future != null) {
+            LOG.info("canceling the task attempt " + taId);
+            future.cancel(true);
+          }
+
+          // send "cleaned" event to task attempt to move us from
+          // SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state (or 
+          // {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
           context.getEventHandler().handle(
-              new TaskAttemptEvent(event.getTaskAttemptID(),
+              new TaskAttemptEvent(taId,
                   TaskAttemptEventType.TA_CONTAINER_CLEANED));
 
         } else {
@@ -267,7 +252,75 @@ public class LocalContainerLauncher exte
       }
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings("unchecked")
+    private void runTask(ContainerRemoteLaunchEvent launchEv,
+        Map<TaskAttemptID, MapOutputFile> localMapFiles) {
+      TaskAttemptId attemptID = launchEv.getTaskAttemptID(); 
+
+      Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
+      int numMapTasks = job.getTotalMaps();
+      int numReduceTasks = job.getTotalReduces();
+
+      // YARN (tracking) Task:
+      org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
+          job.getTask(attemptID.getTaskId());
+      // classic mapred Task:
+      org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
+
+      // after "launching," send launched event to task attempt to move
+      // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
+      // do getRemoteTask() call first)
+      
+      //There is no port number because we are not really talking to a task
+      // tracker.  The shuffle is just done through local files.  So the
+      // port number is set to -1 in this case.
+      context.getEventHandler().handle(
+          new TaskAttemptContainerLaunchedEvent(attemptID, -1));
+
+      if (numMapTasks == 0) {
+        doneWithMaps = true;
+      }
+
+      try {
+        if (remoteTask.isMapOrReduce()) {
+          JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+          jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+          if (remoteTask.isMapTask()) {
+            jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+          } else {
+            jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+          }
+          context.getEventHandler().handle(jce);
+        }
+        runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
+                   (numReduceTasks > 0), localMapFiles);
+        
+      } catch (RuntimeException re) {
+        JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+        jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+        context.getEventHandler().handle(jce);
+        // this is our signal that the subtask failed in some way, so
+        // simulate a failed JVM/container and send a container-completed
+        // event to task attempt (i.e., move state machine from RUNNING
+        // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
+        context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+      } catch (IOException ioe) {
+        // if umbilical itself barfs (in error-handler of runSubMap()),
+        // we're pretty much hosed, so do what YarnChild main() does
+        // (i.e., exit clumsily--but can never happen, so no worries!)
+        LOG.fatal("oopsie...  this can never happen: "
+            + StringUtils.stringifyException(ioe));
+        ExitUtil.terminate(-1);
+      } finally {
+        // remove my future
+        if (futures.remove(attemptID) != null) {
+          LOG.info("removed attempt " + attemptID +
+              " from the futures to keep track of");
+        }
+      }
+    }
+
     private void runSubtask(org.apache.hadoop.mapred.Task task,
                             final TaskType taskType,
                             TaskAttemptId attemptID,
@@ -355,7 +408,9 @@ public class LocalContainerLauncher exte
       } catch (FSError e) {
         LOG.fatal("FSError from child", e);
         // umbilical:  MRAppMaster creates (taskAttemptListener), passes to us
-        umbilical.fsError(classicAttemptID, e.getMessage());
+        if (!ShutdownHookManager.get().isShutdownInProgress()) {
+          umbilical.fsError(classicAttemptID, e.getMessage());
+        }
         throw new RuntimeException();
 
       } catch (Exception exception) {
@@ -378,54 +433,18 @@ public class LocalContainerLauncher exte
       } catch (Throwable throwable) {
         LOG.fatal("Error running local (uberized) 'child' : "
             + StringUtils.stringifyException(throwable));
-        Throwable tCause = throwable.getCause();
-        String cause = (tCause == null)
-            ? throwable.getMessage()
-                : StringUtils.stringifyException(tCause);
-            umbilical.fatalError(classicAttemptID, cause);
+        if (!ShutdownHookManager.get().isShutdownInProgress()) {
+          Throwable tCause = throwable.getCause();
+          String cause =
+              (tCause == null) ? throwable.getMessage() : StringUtils
+                  .stringifyException(tCause);
+          umbilical.fatalError(classicAttemptID, cause);
+        }
         throw new RuntimeException();
       }
     }
 
     /**
-     * Within the _local_ filesystem (not HDFS), all activity takes place within
-     * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
-     * and all sub-MapTasks create the same filename ("file.out").  Rename that
-     * to something unique (e.g., "map_0.out") to avoid collisions.
-     *
-     * Longer-term, we'll modify [something] to use TaskAttemptID-based
-     * filenames instead of "file.out". (All of this is entirely internal,
-     * so there are no particular compatibility issues.)
-     */
-    @SuppressWarnings("deprecation")
-    private MapOutputFile renameMapOutputForReduce(JobConf conf,
-        TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      // move map output to reduce input
-      Path mapOut = subMapOutputFile.getOutputFile();
-      FileStatus mStatus = localFs.getFileStatus(mapOut);      
-      Path reduceIn = subMapOutputFile.getInputFileForWrite(
-          TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
-      Path mapOutIndex = new Path(mapOut.toString() + ".index");
-      Path reduceInIndex = new Path(reduceIn.toString() + ".index");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Renaming map output file for task attempt "
-            + mapId.toString() + " from original location " + mapOut.toString()
-            + " to destination " + reduceIn.toString());
-      }
-      if (!localFs.mkdirs(reduceIn.getParent())) {
-        throw new IOException("Mkdirs failed to create "
-            + reduceIn.getParent().toString());
-      }
-      if (!localFs.rename(mapOut, reduceIn))
-        throw new IOException("Couldn't rename " + mapOut);
-      if (!localFs.rename(mapOutIndex, reduceInIndex))
-        throw new IOException("Couldn't rename " + mapOutIndex);
-      
-      return new RenamedMapOutputFile(reduceIn);
-    }
-
-    /**
      * Also within the local filesystem, we need to restore the initial state
      * of the directory as much as possible.  Compare current contents against
      * the saved original state and nuke everything that doesn't belong, with
@@ -456,8 +475,47 @@ public class LocalContainerLauncher exte
       }
     }
 
-  } // end SubtaskRunner
-  
+  } // end EventHandler
+
+  /**
+   * Within the _local_ filesystem (not HDFS), all activity takes place within
+   * a subdir inside one of the LOCAL_DIRS
+   * (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+   * and all sub-MapTasks create the same filename ("file.out").  Rename that
+   * to something unique (e.g., "map_0.out") to avoid possible collisions.
+   *
+   * Longer-term, we'll modify [something] to use TaskAttemptID-based
+   * filenames instead of "file.out". (All of this is entirely internal,
+   * so there are no particular compatibility issues.)
+   */
+  @VisibleForTesting
+  protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
+      TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    // move map output to reduce input
+    Path mapOut = subMapOutputFile.getOutputFile();
+    FileStatus mStatus = localFs.getFileStatus(mapOut);
+    Path reduceIn = subMapOutputFile.getInputFileForWrite(
+        TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+    Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
+    Path reduceInIndex = new Path(reduceIn.toString() + ".index");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming map output file for task attempt "
+          + mapId.toString() + " from original location " + mapOut.toString()
+          + " to destination " + reduceIn.toString());
+    }
+    if (!localFs.mkdirs(reduceIn.getParent())) {
+      throw new IOException("Mkdirs failed to create "
+          + reduceIn.getParent().toString());
+    }
+    if (!localFs.rename(mapOut, reduceIn))
+      throw new IOException("Couldn't rename " + mapOut);
+    if (!localFs.rename(mapOutIndex, reduceInIndex))
+      throw new IOException("Couldn't rename " + mapOutIndex);
+
+    return new RenamedMapOutputFile(reduceIn);
+  }
+
   private static class RenamedMapOutputFile extends MapOutputFile {
     private Path path;
     

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Tue Aug 19 23:49:39 2014
@@ -141,7 +141,9 @@ public class TaskAttemptListenerImpl ext
       }
 
       server.start();
-      this.address = NetUtils.getConnectAddress(server);
+      this.address = NetUtils.createSocketAddrForHost(
+          context.getNMHostname(),
+          server.getListenerAddress().getPort());
     } catch (IOException e) {
       throw new YarnRuntimeException(e);
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Tue Aug 19 23:49:39 2014
@@ -56,6 +56,7 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -176,7 +177,9 @@ class YarnChild {
       });
     } catch (FSError e) {
       LOG.fatal("FSError from child", e);
-      umbilical.fsError(taskid, e.getMessage());
+      if (!ShutdownHookManager.get().isShutdownInProgress()) {
+        umbilical.fsError(taskid, e.getMessage());
+      }
     } catch (Exception exception) {
       LOG.warn("Exception running child : "
           + StringUtils.stringifyException(exception));
@@ -201,17 +204,22 @@ class YarnChild {
       }
       // Report back any failures, for diagnostic purposes
       if (taskid != null) {
-        umbilical.fatalError(taskid, StringUtils.stringifyException(exception));
+        if (!ShutdownHookManager.get().isShutdownInProgress()) {
+          umbilical.fatalError(taskid,
+              StringUtils.stringifyException(exception));
+        }
       }
     } catch (Throwable throwable) {
       LOG.fatal("Error running child : "
     	        + StringUtils.stringifyException(throwable));
       if (taskid != null) {
-        Throwable tCause = throwable.getCause();
-        String cause = tCause == null
-                                 ? throwable.getMessage()
-                                 : StringUtils.stringifyException(tCause);
-        umbilical.fatalError(taskid, cause);
+        if (!ShutdownHookManager.get().isShutdownInProgress()) {
+          Throwable tCause = throwable.getCause();
+          String cause =
+              tCause == null ? throwable.getMessage() : StringUtils
+                  .stringifyException(tCause);
+          umbilical.fatalError(taskid, cause);
+        }
       }
     } finally {
       RPC.stopProxy(umbilical);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Aug 19 23:49:39 2014
@@ -28,13 +28,13 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -74,7 +74,9 @@ public class JobHistoryEventHandler exte
 
   private int eventCounter;
 
-  //TODO Does the FS object need to be different ? 
+  // Those file systems may differ from the job configuration
+  // See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
+  // #ensurePathInDefaultFileSystem
   private FileSystem stagingDirFS; // log Dir FileSystem
   private FileSystem doneDirFS; // done Dir FileSystem
 
@@ -141,7 +143,7 @@ public class JobHistoryEventHandler exte
     //Check for the existence of the history staging dir. Maybe create it. 
     try {
       stagingDirPath =
-          FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
+          FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr));
       stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
       mkdir(stagingDirFS, stagingDirPath, new FsPermission(
           JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
@@ -154,7 +156,7 @@ public class JobHistoryEventHandler exte
     //Check for the existence of intermediate done dir.
     Path doneDirPath = null;
     try {
-      doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
+      doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr));
       doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
       // This directory will be in a common location, or this may be a cluster
       // meant for a single user. Creating based on the conf. Should ideally be
@@ -194,7 +196,7 @@ public class JobHistoryEventHandler exte
     //Check/create user directory under intermediate done dir.
     try {
       doneDirPrefixPath =
-          FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
+          FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr));
       mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
           JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
     } catch (IOException e) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java Tue Aug 19 23:49:39 2014
@@ -66,4 +66,5 @@ public interface AppContext {
 
   boolean hasSuccessfullyUnregistered();
 
+  String getNMHostname();
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Aug 19 23:49:39 2014
@@ -41,13 +41,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.LocalContainerLauncher;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -67,6 +67,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -184,7 +185,6 @@ public class MRAppMaster extends Composi
   private final int nmPort;
   private final int nmHttpPort;
   protected final MRAppMetrics metrics;
-  private final int maxAppAttempts;
   private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private List<AMInfo> amInfos;
   private AppContext context;
@@ -199,6 +199,7 @@ public class MRAppMaster extends Composi
       new JobTokenSecretManager();
   private JobId jobId;
   private boolean newApiCommitter;
+  private ClassLoader jobClassLoader;
   private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
@@ -225,14 +226,14 @@ public class MRAppMaster extends Composi
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      long appSubmitTime, int maxAppAttempts) {
+      long appSubmitTime) {
     this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
-        new SystemClock(), appSubmitTime, maxAppAttempts);
+        new SystemClock(), appSubmitTime);
   }
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      Clock clock, long appSubmitTime, int maxAppAttempts) {
+      Clock clock, long appSubmitTime) {
     super(MRAppMaster.class.getName());
     this.clock = clock;
     this.startTime = clock.getTime();
@@ -243,25 +244,21 @@ public class MRAppMaster extends Composi
     this.nmPort = nmPort;
     this.nmHttpPort = nmHttpPort;
     this.metrics = MRAppMetrics.create();
-    this.maxAppAttempts = maxAppAttempts;
     logSyncer = TaskLog.createLogSyncer();
     LOG.info("Created MRAppMaster for application " + applicationAttemptId);
   }
 
   @Override
   protected void serviceInit(final Configuration conf) throws Exception {
+    // create the job classloader if enabled
+    createJobClassLoader(conf);
+
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     initJobCredentialsAndUGI(conf);
 
     context = new RunningAppContext(conf);
 
-    ((RunningAppContext)context).computeIsLastAMRetry();
-    LOG.info("The specific max attempts: " + maxAppAttempts +
-        " for application: " + appAttemptID.getApplicationId().getId() +
-        ". Attempt num: " + appAttemptID.getAttemptId() +
-        " is last retry: " + isLastAMRetry);
-
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
     appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
@@ -393,8 +390,13 @@ public class MRAppMaster extends Composi
       addIfService(committerEventHandler);
 
       //policy handling preemption requests from RM
-      preemptionPolicy = createPreemptionPolicy(conf);
-      preemptionPolicy.init(context);
+      callWithJobClassLoader(conf, new Action<Void>() {
+        public Void call(Configuration conf) {
+          preemptionPolicy = createPreemptionPolicy(conf);
+          preemptionPolicy.init(context);
+          return null;
+        }
+      });
 
       //service to handle requests to TaskUmbilicalProtocol
       taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
@@ -459,33 +461,37 @@ public class MRAppMaster extends Composi
   }
 
   private OutputCommitter createOutputCommitter(Configuration conf) {
-    OutputCommitter committer = null;
-
-    LOG.info("OutputCommitter set in config "
-        + conf.get("mapred.output.committer.class"));
-
-    if (newApiCommitter) {
-      org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
-          .newTaskId(jobId, 0, TaskType.MAP);
-      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
-          .newTaskAttemptId(taskID, 0);
-      TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
-          TypeConverter.fromYarn(attemptID));
-      OutputFormat outputFormat;
-      try {
-        outputFormat = ReflectionUtils.newInstance(taskContext
-            .getOutputFormatClass(), conf);
-        committer = outputFormat.getOutputCommitter(taskContext);
-      } catch (Exception e) {
-        throw new YarnRuntimeException(e);
+    return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
+      public OutputCommitter call(Configuration conf) {
+        OutputCommitter committer = null;
+
+        LOG.info("OutputCommitter set in config "
+            + conf.get("mapred.output.committer.class"));
+
+        if (newApiCommitter) {
+          org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID =
+              MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+          org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+              MRBuilderUtils.newTaskAttemptId(taskID, 0);
+          TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+              TypeConverter.fromYarn(attemptID));
+          OutputFormat outputFormat;
+          try {
+            outputFormat = ReflectionUtils.newInstance(taskContext
+                .getOutputFormatClass(), conf);
+            committer = outputFormat.getOutputCommitter(taskContext);
+          } catch (Exception e) {
+            throw new YarnRuntimeException(e);
+          }
+        } else {
+          committer = ReflectionUtils.newInstance(conf.getClass(
+              "mapred.output.committer.class", FileOutputCommitter.class,
+              org.apache.hadoop.mapred.OutputCommitter.class), conf);
+        }
+        LOG.info("OutputCommitter is " + committer.getClass().getName());
+        return committer;
       }
-    } else {
-      committer = ReflectionUtils.newInstance(conf.getClass(
-          "mapred.output.committer.class", FileOutputCommitter.class,
-          org.apache.hadoop.mapred.OutputCommitter.class), conf);
-    }
-    LOG.info("OutputCommitter is " + committer.getClass().getName());
-    return committer;
+    });
   }
 
   protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
@@ -600,10 +606,6 @@ public class MRAppMaster extends Composi
       LOG.warn("Graceful stop failed ", t);
     }
 
-    //Bring the process down by force.
-    //Not needed after HADOOP-7140
-    LOG.info("Exiting MR AppMaster..GoodBye!");
-    sysexit();   
   }
  
   private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
@@ -677,38 +679,42 @@ public class MRAppMaster extends Composi
     return new StagingDirCleaningService();
   }
 
-  protected Speculator createSpeculator(Configuration conf, AppContext context) {
-    Class<? extends Speculator> speculatorClass;
-
-    try {
-      speculatorClass
-          // "yarn.mapreduce.job.speculator.class"
-          = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
-                          DefaultSpeculator.class,
-                          Speculator.class);
-      Constructor<? extends Speculator> speculatorConstructor
-          = speculatorClass.getConstructor
-               (Configuration.class, AppContext.class);
-      Speculator result = speculatorConstructor.newInstance(conf, context);
-
-      return result;
-    } catch (InstantiationException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (IllegalAccessException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (InvocationTargetException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    } catch (NoSuchMethodException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
-      throw new YarnRuntimeException(ex);
-    }
+  protected Speculator createSpeculator(Configuration conf,
+      final AppContext context) {
+    return callWithJobClassLoader(conf, new Action<Speculator>() {
+      public Speculator call(Configuration conf) {
+        Class<? extends Speculator> speculatorClass;
+        try {
+          speculatorClass
+              // "yarn.mapreduce.job.speculator.class"
+              = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+                              DefaultSpeculator.class,
+                              Speculator.class);
+          Constructor<? extends Speculator> speculatorConstructor
+              = speculatorClass.getConstructor
+                   (Configuration.class, AppContext.class);
+          Speculator result = speculatorConstructor.newInstance(conf, context);
+
+          return result;
+        } catch (InstantiationException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (IllegalAccessException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (InvocationTargetException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        } catch (NoSuchMethodException ex) {
+          LOG.error("Can't make a speculator -- check "
+              + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          throw new YarnRuntimeException(ex);
+        }
+      }
+    });
   }
 
   protected TaskAttemptListener createTaskAttemptListener(AppContext context,
@@ -722,7 +728,7 @@ public class MRAppMaster extends Composi
   protected EventHandler<CommitterEvent> createCommitterEventHandler(
       AppContext context, OutputCommitter committer) {
     return new CommitterEventHandler(context, committer,
-        getRMHeartbeatHandler());
+        getRMHeartbeatHandler(), jobClassLoader);
   }
 
   protected ContainerAllocator createContainerAllocator(
@@ -1009,8 +1015,13 @@ public class MRAppMaster extends Composi
       successfullyUnregistered.set(true);
     }
 
-    public void computeIsLastAMRetry() {
-      isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+    public void resetIsLastAMRetry() {
+      isLastAMRetry = false;
+    }
+
+    @Override
+    public String getNMHostname() {
+      return nmHost;
     }
   }
 
@@ -1054,6 +1065,7 @@ public class MRAppMaster extends Composi
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");
 
+    boolean initFailed = false;
     if (!errorHappenedShutDown) {
       // create a job event for job intialization
       JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
@@ -1062,6 +1074,10 @@ public class MRAppMaster extends Composi
       // job-init to be done completely here.
       jobEventDispatcher.handle(initJobEvent);
 
+      // If job is still not initialized, an error happened during
+      // initialization. Must complete starting all of the services so failure
+      // events can be processed.
+      initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
 
       // JobImpl's InitTransition is done (call above is synchronous), so the
       // "uber-decision" (MR-1220) has been made.  Query job and switch to
@@ -1088,10 +1104,16 @@ public class MRAppMaster extends Composi
     //start all the components
     super.serviceStart();
 
-    // set job classloader if configured
-    MRApps.setJobClassLoader(getConfig());
-    // All components have started, start the job.
-    startJobs();
+    // finally set the job classloader
+    MRApps.setClassLoader(jobClassLoader, getConfig());
+
+    if (initFailed) {
+      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
+      jobEventDispatcher.handle(initFailedEvent);
+    } else {
+      // All components have started, start the job.
+      startJobs();
+    }
   }
   
   @Override
@@ -1100,7 +1122,29 @@ public class MRAppMaster extends Composi
     TaskLog.syncLogsShutdown(logSyncer);
   }
 
-  private void processRecovery() {
+  private boolean isRecoverySupported() throws IOException {
+    boolean isSupported = false;
+    Configuration conf = getConfig();
+    if (committer != null) {
+      final JobContext _jobContext;
+      if (newApiCommitter) {
+         _jobContext = new JobContextImpl(
+            conf, TypeConverter.fromYarn(getJobId()));
+      } else {
+          _jobContext = new org.apache.hadoop.mapred.JobContextImpl(
+                new JobConf(conf), TypeConverter.fromYarn(getJobId()));
+      }
+      isSupported = callWithJobClassLoader(conf,
+          new ExceptionAction<Boolean>() {
+            public Boolean call(Configuration conf) throws IOException {
+              return committer.isRecoverySupported(_jobContext);
+            }
+      });
+    }
+    return isSupported;
+  }
+
+  private void processRecovery() throws IOException{
     if (appAttemptID.getAttemptId() == 1) {
       return;  // no need to recover on the first attempt
     }
@@ -1108,8 +1152,8 @@ public class MRAppMaster extends Composi
     boolean recoveryEnabled = getConfig().getBoolean(
         MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
         MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
-    boolean recoverySupportedByCommitter =
-        committer != null && committer.isRecoverySupported();
+
+    boolean recoverySupportedByCommitter = isRecoverySupported();
 
     // If a shuffle secret was not provided by the job client then this app
     // attempt will generate one.  However that disables recovery if there
@@ -1294,7 +1338,7 @@ public class MRAppMaster extends Composi
       this.conf = config;
     }
     @Override
-    public void handle(SpeculatorEvent event) {
+    public void handle(final SpeculatorEvent event) {
       if (disabled) {
         return;
       }
@@ -1321,7 +1365,12 @@ public class MRAppMaster extends Composi
       if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
         || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
         // Speculator IS enabled, direct the event to there.
-        speculator.handle(event);
+        callWithJobClassLoader(conf, new Action<Void>() {
+          public Void call(Configuration conf) {
+            speculator.handle(event);
+            return null;
+          }
+        });
       }
     }
 
@@ -1362,8 +1411,6 @@ public class MRAppMaster extends Composi
           System.getenv(Environment.NM_HTTP_PORT.name());
       String appSubmitTimeStr =
           System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
-      String maxAppAttempts =
-          System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
       
       validateInputParam(containerIdStr,
           Environment.CONTAINER_ID.name());
@@ -1373,8 +1420,6 @@ public class MRAppMaster extends Composi
           Environment.NM_HTTP_PORT.name());
       validateInputParam(appSubmitTimeStr,
           ApplicationConstants.APP_SUBMIT_TIME_ENV);
-      validateInputParam(maxAppAttempts,
-          ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
 
       ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
       ApplicationAttemptId applicationAttemptId =
@@ -1385,8 +1430,7 @@ public class MRAppMaster extends Composi
       MRAppMaster appMaster =
           new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
-              Integer.parseInt(nodeHttpPortString), appSubmitTime,
-              Integer.parseInt(maxAppAttempts));
+              Integer.parseInt(nodeHttpPortString), appSubmitTime);
       ShutdownHookManager.get().addShutdownHook(
         new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
       JobConf conf = new JobConf(new YarnConfiguration());
@@ -1486,6 +1530,102 @@ public class MRAppMaster extends Composi
     });
   }
 
+  /**
+   * Creates a job classloader based on the configuration if the job classloader
+   * is enabled. It is a no-op if the job classloader is not enabled.
+   */
+  private void createJobClassLoader(Configuration conf) throws IOException {
+    jobClassLoader = MRApps.createJobClassLoader(conf);
+  }
+
+  /**
+   * Executes the given action with the job classloader set as the configuration
+   * classloader as well as the thread context class loader if the job
+   * classloader is enabled. After the call, the original classloader is
+   * restored.
+   *
+   * If the job classloader is enabled and the code needs to load user-supplied
+   * classes via configuration or thread context classloader, this method should
+   * be used in order to load them.
+   *
+   * @param conf the configuration on which the classloader will be set
+   * @param action the callable action to be executed
+   */
+  <T> T callWithJobClassLoader(Configuration conf, Action<T> action) {
+    // if the job classloader is enabled, we may need it to load the (custom)
+    // classes; we make the job classloader available and unset it once it is
+    // done
+    ClassLoader currentClassLoader = conf.getClassLoader();
+    boolean setJobClassLoader =
+        jobClassLoader != null && currentClassLoader != jobClassLoader;
+    if (setJobClassLoader) {
+      MRApps.setClassLoader(jobClassLoader, conf);
+    }
+    try {
+      return action.call(conf);
+    } finally {
+      if (setJobClassLoader) {
+        // restore the original classloader
+        MRApps.setClassLoader(currentClassLoader, conf);
+      }
+    }
+  }
+
+  /**
+   * Executes the given action that can throw a checked exception with the job
+   * classloader set as the configuration classloader as well as the thread
+   * context class loader if the job classloader is enabled. After the call, the
+   * original classloader is restored.
+   *
+   * If the job classloader is enabled and the code needs to load user-supplied
+   * classes via configuration or thread context classloader, this method should
+   * be used in order to load them.
+   *
+   * @param conf the configuration on which the classloader will be set
+   * @param action the callable action to be executed
+   * @throws IOException if the underlying action throws an IOException
+   * @throws YarnRuntimeException if the underlying action throws an exception
+   * other than an IOException
+   */
+  <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action)
+      throws IOException {
+    // if the job classloader is enabled, we may need it to load the (custom)
+    // classes; we make the job classloader available and unset it once it is
+    // done
+    ClassLoader currentClassLoader = conf.getClassLoader();
+    boolean setJobClassLoader =
+        jobClassLoader != null && currentClassLoader != jobClassLoader;
+    if (setJobClassLoader) {
+      MRApps.setClassLoader(jobClassLoader, conf);
+    }
+    try {
+      return action.call(conf);
+    } catch (IOException e) {
+      throw e;
+    } catch (YarnRuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      // wrap it with a YarnRuntimeException
+      throw new YarnRuntimeException(e);
+    } finally {
+      if (setJobClassLoader) {
+        // restore the original classloader
+        MRApps.setClassLoader(currentClassLoader, conf);
+      }
+    }
+  }
+
+  /**
+   * Action to be wrapped with setting and unsetting the job classloader
+   */
+  private static interface Action<T> {
+    T call(Configuration conf);
+  }
+
+  private static interface ExceptionAction<T> {
+    T call(Configuration conf) throws Exception;
+  }
+
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Tue Aug 19 23:49:39 2014
@@ -131,7 +131,8 @@ public class MRClientService extends Abs
     }
 
     server.start();
-    this.bindAddress = NetUtils.getConnectAddress(server);
+    this.bindAddress = NetUtils.createSocketAddrForHost(appContext.getNMHostname(),
+        server.getListenerAddress().getPort());
     LOG.info("Instantiated MRClientService at " + this.bindAddress);
     try {
       // Explicitly disabling SSL for map reduce task as we can't allow MR users

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java Tue Aug 19 23:49:39 2014
@@ -68,6 +68,7 @@ public class CommitterEventHandler exten
   private BlockingQueue<CommitterEvent> eventQueue =
       new LinkedBlockingQueue<CommitterEvent>();
   private final AtomicBoolean stopped;
+  private final ClassLoader jobClassLoader;
   private Thread jobCommitThread = null;
   private int commitThreadCancelTimeoutMs;
   private long commitWindowMs;
@@ -79,11 +80,17 @@ public class CommitterEventHandler exten
 
   public CommitterEventHandler(AppContext context, OutputCommitter committer,
       RMHeartbeatHandler rmHeartbeatHandler) {
+    this(context, committer, rmHeartbeatHandler, null);
+  }
+  
+  public CommitterEventHandler(AppContext context, OutputCommitter committer,
+      RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
     super("CommitterEventHandler");
     this.context = context;
     this.committer = committer;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
     this.stopped = new AtomicBoolean(false);
+    this.jobClassLoader = jobClassLoader;
   }
 
   @Override
@@ -109,9 +116,23 @@ public class CommitterEventHandler exten
 
   @Override
   protected void serviceStart() throws Exception {
-    ThreadFactory tf = new ThreadFactoryBuilder()
-      .setNameFormat("CommitterEvent Processor #%d")
-      .build();
+    ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder()
+        .setNameFormat("CommitterEvent Processor #%d");
+    if (jobClassLoader != null) {
+      // if the job classloader is enabled, we need to use the job classloader
+      // as the thread context classloader (TCCL) of these threads in case the
+      // committer needs to load another class via TCCL
+      ThreadFactory backingTf = new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+          Thread thread = new Thread(r);
+          thread.setContextClassLoader(jobClassLoader);
+          return thread;
+        }
+      };
+      tfBuilder.setThreadFactory(backingTf);
+    }
+    ThreadFactory tf = tfBuilder.build();
     launcherPool = new ThreadPoolExecutor(5, 5, 1,
         TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
     eventHandlingThread = new Thread(new Runnable() {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Tue Aug 19 23:49:39 2014
@@ -28,6 +28,7 @@ public enum JobEventType {
 
   //Producer:MRAppMaster
   JOB_INIT,
+  JOB_INIT_FAILED,
   JOB_START,
 
   //Producer:Task

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Aug 19 23:49:39 2014
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -129,6 +130,8 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /** Implementation of Job interface. Maintains the state machines of Job.
  * The read and write calls use ReadWriteLock for concurrency.
  */
@@ -145,10 +148,10 @@ public class JobImpl implements org.apac
   private static final Log LOG = LogFactory.getLog(JobImpl.class);
 
   //The maximum fraction of fetch failures allowed for a map
-  private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
-
-  // Maximum no. of fetch-failure notifications after which map task is failed
-  private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+  private float maxAllowedFetchFailuresFraction;
+  
+  //Maximum no. of fetch-failure notifications after which map task is failed
+  private int maxFetchFailuresNotifications;
 
   public static final String JOB_KILLED_DIAG =
       "Job received Kill while in RUNNING state.";
@@ -250,9 +253,12 @@ public class JobImpl implements org.apac
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
               (JobStateInternal.NEW,
-              EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+              EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
               JobEventType.JOB_INIT,
               new InitTransition())
+          .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
+              JobEventType.JOB_INIT_FAILED,
+              new InitFailedTransition())
           .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
               JobEventType.JOB_KILL,
               new KillNewJobTransition())
@@ -265,7 +271,7 @@ public class JobImpl implements org.apac
           // Ignore-able events
           .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_UPDATED_NODES)
-              
+
           // Transitions from INITED state
           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -641,8 +647,8 @@ public class JobImpl implements org.apac
   
   private JobStateInternal forcedState = null;
 
-  //Executor used for running future tasks. Setting thread pool size to 1
-  private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+  //Executor used for running future tasks.
+  private ScheduledThreadPoolExecutor executor;
   private ScheduledFuture failWaitTriggerScheduledFuture;
 
   private JobState lastNonFinalState = JobState.NEW;
@@ -684,6 +690,13 @@ public class JobImpl implements org.apac
     this.aclsManager = new JobACLsManager(conf);
     this.username = System.getProperty("user.name");
     this.jobACLs = aclsManager.constructJobACLs(conf);
+
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+      .setNameFormat("Job Fail Wait Timeout Monitor #%d")
+      .setDaemon(true)
+      .build();
+    this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -691,6 +704,13 @@ public class JobImpl implements org.apac
     if(forcedDiagnostic != null) {
       this.diagnostics.add(forcedDiagnostic);
     }
+    
+    this.maxAllowedFetchFailuresFraction = conf.getFloat(
+        MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION,
+        MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION);
+    this.maxFetchFailuresNotifications = conf.getInt(
+        MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
+        MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
   }
 
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -717,7 +737,7 @@ public class JobImpl implements org.apac
     if (jobACL == null) {
       return true;
     }
-    return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
+    return aclsManager.checkAccess(callerUGI, jobOperation, userName, jobACL);
   }
 
   @Override
@@ -1205,22 +1225,25 @@ public class JobImpl implements org.apac
     boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
     boolean smallInput = (dataInputLength <= sysMaxBytes);
     // ignoring overhead due to UberAM and statics as negligible here:
+    long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);
+    long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);
+    long requiredMB = Math.max(requiredMapMB, requiredReduceMB);
+    int requiredMapCores = conf.getInt(
+            MRJobConfig.MAP_CPU_VCORES, 
+            MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+    int requiredReduceCores = conf.getInt(
+            MRJobConfig.REDUCE_CPU_VCORES, 
+            MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+    int requiredCores = Math.max(requiredMapCores, requiredReduceCores);    
+    if (numReduceTasks == 0) {
+      requiredMB = requiredMapMB;
+      requiredCores = requiredMapCores;
+    }
     boolean smallMemory =
-        ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
-            conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
-            <= sysMemSizeForUberSlot)
-            || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
-    boolean smallCpu =
-        (
-            Math.max(
-                conf.getInt(
-                    MRJobConfig.MAP_CPU_VCORES, 
-                    MRJobConfig.DEFAULT_MAP_CPU_VCORES), 
-                conf.getInt(
-                    MRJobConfig.REDUCE_CPU_VCORES, 
-                    MRJobConfig.DEFAULT_REDUCE_CPU_VCORES)) 
-             <= sysCPUSizeForUberSlot
-        );
+        (requiredMB <= sysMemSizeForUberSlot)
+        || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
+    
+    boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
     boolean notChainJob = !isChainJob(conf);
 
     // User has overall veto power over uberization, or user can modify
@@ -1285,6 +1308,7 @@ public class JobImpl implements org.apac
       }
     } catch (ClassNotFoundException cnfe) {
       // don't care; assume it's not derived from ChainMapper
+    } catch (NoClassDefFoundError ignored) {
     }
     try {
       String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
@@ -1295,6 +1319,7 @@ public class JobImpl implements org.apac
       }
     } catch (ClassNotFoundException cnfe) {
       // don't care; assume it's not derived from ChainReducer
+    } catch (NoClassDefFoundError ignored) {
     }
     return isChainJob;
   }
@@ -1374,6 +1399,15 @@ public class JobImpl implements org.apac
     public JobStateInternal transition(JobImpl job, JobEvent event) {
       job.metrics.submittedJob(job);
       job.metrics.preparingJob(job);
+
+      if (job.newApiCommitter) {
+        job.jobContext = new JobContextImpl(job.conf,
+            job.oldJobId);
+      } else {
+        job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
+            job.conf, job.oldJobId);
+      }
+      
       try {
         setup(job);
         job.fs = job.getFileSystem(job.conf);
@@ -1409,14 +1443,6 @@ public class JobImpl implements org.apac
 
         checkTaskLimits();
 
-        if (job.newApiCommitter) {
-          job.jobContext = new JobContextImpl(job.conf,
-              job.oldJobId);
-        } else {
-          job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
-              job.conf, job.oldJobId);
-        }
-        
         long inputLength = 0;
         for (int i = 0; i < job.numMapTasks; ++i) {
           inputLength += taskSplitMetaInfo[i].getInputDataLength();
@@ -1443,15 +1469,14 @@ public class JobImpl implements org.apac
 
         job.metrics.endPreparingJob(job);
         return JobStateInternal.INITED;
-      } catch (IOException e) {
+      } catch (Exception e) {
         LOG.warn("Job init failed", e);
         job.metrics.endPreparingJob(job);
         job.addDiagnostic("Job init failed : "
             + StringUtils.stringifyException(e));
-        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
-            job.jobContext,
-            org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
-        return JobStateInternal.FAILED;
+        // Leave job in the NEW state. The MR AM will detect that the state is
+        // not INITED and send a JOB_INIT_FAILED event.
+        return JobStateInternal.NEW;
       }
     }
 
@@ -1552,6 +1577,16 @@ public class JobImpl implements org.apac
     }
   } // end of InitTransition
 
+  private static class InitFailedTransition
+      implements SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+                job.jobContext,
+                org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+    }
+  }
+
   private static class SetupCompletedTransition
       implements SingleArcTransition<JobImpl, JobEvent> {
     @Override
@@ -1872,9 +1907,8 @@ public class JobImpl implements org.apac
         float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
           (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures
-        boolean isMapFaulty =
-            (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
-        if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
+        if (fetchFailures >= job.getMaxFetchFailuresNotifications()
+            && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
           LOG.info("Too many fetch-failures for output of task attempt: " + 
               mapId + " ... raising fetch failure to map");
           job.eventHandler.handle(new TaskAttemptEvent(mapId, 
@@ -2157,4 +2191,12 @@ public class JobImpl implements org.apac
     jobConf.addResource(fc.open(confPath), confPath.toString());
     return jobConf;
   }
+
+  public float getMaxAllowedFetchFailuresFraction() {
+    return maxAllowedFetchFailuresFraction;
+  }
+
+  public int getMaxFetchFailuresNotifications() {
+    return maxFetchFailuresNotifications;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Aug 19 23:49:39 2014
@@ -335,6 +335,15 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
+     // AM is likely to receive duplicate TA_COMMIT_PENDINGs as the task attempt
+     // will re-send the commit message until it doesn't encounter any
+     // IOException and succeeds in delivering the commit message.
+     // Ignoring the duplicate commit message is a short-term fix. In long term,
+     // we need to make use of retry cache to help this and other MR protocol
+     // APIs that can be considered as @AtMostOnce.
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptEventType.TA_COMMIT_PENDING)
 
      // Transitions from SUCCESS_CONTAINER_CLEANUP state
      // kill and cleanup the container

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Aug 19 23:49:39 2014
@@ -249,8 +249,16 @@ public abstract class TaskImpl implement
                    TaskEventType.T_ATTEMPT_SUCCEEDED))
 
     // Transitions from KILLED state
+    // There could be a race condition where TaskImpl might receive
+    // T_ATTEMPT_SUCCEEDED followed by T_ATTEMPTED_KILLED for the same attempt.
+    // a. The task is in KILL_WAIT.
+    // b. Before TA transitions to SUCCEEDED state, Task sends TA_KILL event.
+    // c. TA transitions to SUCCEEDED state and thus send T_ATTEMPT_SUCCEEDED
+    //    to the task. The task transitions to KILLED state.
+    // d. TA processes TA_KILL event and sends T_ATTEMPT_KILLED to the task.
     .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(TaskEventType.T_KILL,
+                   TaskEventType.T_ATTEMPT_KILLED,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // create the topology tables

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Tue Aug 19 23:49:39 2014
@@ -64,6 +64,7 @@ public class LocalContainerAllocator ext
   private int nmPort;
   private int nmHttpPort;
   private ContainerId containerId;
+  protected int lastResponseID;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -119,6 +120,11 @@ public class LocalContainerAllocator ext
     if (allocateResponse.getAMCommand() != null) {
       switch(allocateResponse.getAMCommand()) {
       case AM_RESYNC:
+        LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+            + " hence resyncing.");        
+        this.lastResponseID = 0;
+        register();
+        break;
       case AM_SHUTDOWN:
         LOG.info("Event from RM: shutting down Application Master");
         // This can happen if the RM has been restarted. If it is in that state,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Aug 19 23:49:39 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -185,7 +186,7 @@ public abstract class RMCommunicator ext
       // if unregistration failed, isLastAMRetry needs to be recalculated
       // to see whether AM really has the chance to retry
       RunningAppContext raContext = (RunningAppContext) context;
-      raContext.computeIsLastAMRetry();
+      raContext.resetIsLastAMRetry();
     }
   }
 
@@ -216,20 +217,27 @@ public abstract class RMCommunicator ext
     FinishApplicationMasterRequest request =
         FinishApplicationMasterRequest.newInstance(finishState,
           sb.toString(), historyUrl);
-    while (true) {
-      FinishApplicationMasterResponse response =
-          scheduler.finishApplicationMaster(request);
-      if (response.getIsUnregistered()) {
-        // When excepting ClientService, other services are already stopped,
-        // it is safe to let clients know the final states. ClientService
-        // should wait for some time so clients have enough time to know the
-        // final states.
-        RunningAppContext raContext = (RunningAppContext) context;
-        raContext.markSuccessfulUnregistration();
-        break;
+    try {
+      while (true) {
+        FinishApplicationMasterResponse response =
+            scheduler.finishApplicationMaster(request);
+        if (response.getIsUnregistered()) {
+          // When excepting ClientService, other services are already stopped,
+          // it is safe to let clients know the final states. ClientService
+          // should wait for some time so clients have enough time to know the
+          // final states.
+          RunningAppContext raContext = (RunningAppContext) context;
+          raContext.markSuccessfulUnregistration();
+          break;
+        }
+        LOG.info("Waiting for application to be successfully unregistered.");
+        Thread.sleep(rmPollInterval);
       }
-      LOG.info("Waiting for application to be successfully unregistered.");
-      Thread.sleep(rmPollInterval);
+    } catch (ApplicationMasterNotRegisteredException e) {
+      // RM might have restarted or failed over and so lost the fact that AM had
+      // registered before.
+      register();
+      doUnregistration();
     }
   }