You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:59 UTC

svn commit: r1619019 [10/10] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/ha...

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm Wed Aug 20 01:34:29 2014
@@ -11,8 +11,8 @@
 ~~ limitations under the License. See accompanying LICENSE file.
 
   ---
-  Hadoop Map Reduce Next Generation-${project.version} - Writing YARN 
-  Applications 
+  Hadoop Map Reduce Next Generation-${project.version} - Writing YARN
+  Applications
   ---
   ---
   ${maven.build.timestamp}
@@ -21,772 +21,737 @@ Hadoop MapReduce Next Generation - Writi
 
 %{toc|section=1|fromDepth=0}
 
-* Purpose 
+* Purpose
 
-  This document describes, at a high-level, the way to implement new 
+  This document describes, at a high-level, the way to implement new
   Applications for YARN.
 
 * Concepts and Flow
 
-  The general concept is that an 'Application Submission Client' submits an 
-  'Application' to the YARN Resource Manager. The client communicates with the 
-  ResourceManager using the 'ApplicationClientProtocol' to first acquire a new 
-  'ApplicationId' if needed via ApplicationClientProtocol#getNewApplication and then 
-  submit the 'Application' to be run via ApplicationClientProtocol#submitApplication. As 
-  part of the ApplicationClientProtocol#submitApplication call, the client needs to 
-  provide sufficient information to the ResourceManager to 'launch' the 
-  application's first container i.e. the ApplicationMaster. 
-  You need to provide information such as the details about the local 
-  files/jars that need to be available for your application to run, the actual 
-  command that needs to be executed (with the necessary command line arguments), 
-  any Unix environment settings (optional), etc. Effectively, you need to 
-  describe the Unix process(es) that needs to be launched for your 
-  ApplicationMaster. 
-
-  The YARN ResourceManager will then launch the ApplicationMaster (as specified) 
-  on an allocated container. The ApplicationMaster is then expected to 
-  communicate with the ResourceManager using the 'ApplicationMasterProtocol'. Firstly, the 
-  ApplicationMaster needs to register itself with the ResourceManager. To 
-  complete the task assigned to it, the ApplicationMaster can then request for 
-  and receive containers via ApplicationMasterProtocol#allocate. After a container is 
-  allocated to it, the ApplicationMaster communicates with the NodeManager using 
-  ContainerManager#startContainer to launch the container for its task. As part 
-  of launching this container, the ApplicationMaster has to specify the 
-  ContainerLaunchContext which, similar to the ApplicationSubmissionContext, 
-  has the launch information such as command line specification, environment, 
-  etc. Once the task is completed, the ApplicationMaster has to signal the 
-  ResourceManager of its completion via the ApplicationMasterProtocol#finishApplicationMaster. 
-
-  Meanwhile, the client can monitor the application's status by querying the 
-  ResourceManager or by directly querying the ApplicationMaster if it supports 
-  such a service. If needed, it can also kill the application via 
-  ApplicationClientProtocol#forceKillApplication.  
+  The general concept is that an <application submission client> submits an
+  <application> to the YARN <ResourceManager> (RM). This can be done through
+  setting up a <<<YarnClient>>> object. After <<<YarnClient>>> is started, the
+  client can then set up application context, prepare the very first container of
+  the application that contains the <ApplicationMaster> (AM), and then submit
+  the application. You need to provide information such as the details about the
+  local files/jars that need to be available for your application to run, the
+  actual command that needs to be executed (with the necessary command line
+  arguments), any OS environment settings (optional), etc. Effectively, you
+  need to describe the Unix process(es) that needs to be launched for your
+  ApplicationMaster.
+
+  The YARN ResourceManager will then launch the ApplicationMaster (as
+  specified) on an allocated container. The ApplicationMaster communicates with
+  YARN cluster, and handles application execution. It performs operations in an
+  asynchronous fashion. During application launch time, the main tasks of the
+  ApplicationMaster are: a) communicating with the ResourceManager to negotiate
+  and allocate resources for future containers, and b) after container
+  allocation, communicating YARN <NodeManager>s (NMs) to launch application
+  containers on them. Task a) can be performed asynchronously through an
+  <<<AMRMClientAsync>>> object, with event handling methods specified in a
+  <<<AMRMClientAsync.CallbackHandler>>> type of event handler. The event handler
+  needs to be set to the client explicitly. Task b) can be performed by launching
+  a runnable object that then launches containers when there are containers
+  allocated. As part of launching this container, the AM has to
+  specify the <<<ContainerLaunchContext>>> that has the launch information such as
+  command line specification, environment, etc.
+
+  During the execution of an application, the ApplicationMaster communicates
+  NodeManagers through <<<NMClientAsync>>> object. All container events are
+  handled by <<<NMClientAsync.CallbackHandler>>>, associated with
+  <<<NMClientAsync>>>. A typical callback handler handles client start, stop,
+  status update and error. ApplicationMaster also reports execution progress to
+  ResourceManager by handling the <<<getProgress()>>> method of
+  <<<AMRMClientAsync.CallbackHandler>>>.
+  
+  Other than asynchronous clients, there are synchronous versions for certain
+  workflows (<<<AMRMClient>>> and <<<NMClient>>>). The asynchronous clients are
+  recommended because of (subjectively) simpler usages, and this article
+  will mainly cover the asynchronous clients. Please refer to <<<AMRMClient>>>
+  and <<<NMClient>>> for more information on synchronous clients.
 
-* Interfaces 
+* Interfaces
 
   The interfaces you'd most like be concerned with are:
 
-  * ApplicationClientProtocol - Client\<--\>ResourceManager\
-    The protocol for a client that wishes to communicate with the 
-    ResourceManager to launch a new application (i.e. the ApplicationMaster), 
-    check on the status of the application or kill the application. For example, 
-    a job-client (a job launching program from the gateway) would use this 
-    protocol. 
+  * <<Client>>\<--\><<ResourceManager>>\
+    By using <<<YarnClient>>> objects.
+
+  * <<ApplicationMaster>>\<--\><<ResourceManager>>\
+    By using <<<AMRMClientAsync>>> objects, handling events asynchronously by
+    <<<AMRMClientAsync.CallbackHandler>>>
+
+  * <<ApplicationMaster>>\<--\><<NodeManager>>\
+    Launch containers. Communicate with NodeManagers
+    by using <<<NMClientAsync>>> objects, handling container events by
+    <<<NMClientAsync.CallbackHandler>>>
+
+  []
+
+  <<Note>>
   
-  * ApplicationMasterProtocol - ApplicationMaster\<--\>ResourceManager\
-    The protocol used by the ApplicationMaster to register/unregister itself 
-    to/from the ResourceManager as well as to request for resources from the 
-    Scheduler to complete its tasks. 
-    
-  * ContainerManager - ApplicationMaster\<--\>NodeManager\
-    The protocol used by the ApplicationMaster to talk to the NodeManager to 
-    start/stop containers and get status updates on the containers if needed. 
+    * The three main protocols for YARN application (ApplicationClientProtocol,
+      ApplicationMasterProtocol and ContainerManagementProtocol) are still
+      preserved. The 3 clients wrap these 3 protocols to provide simpler
+      programming model for YARN applications.
+    
+    * Under very rare circumstances, programmer may want to directly use the 3
+      protocols to implement an application. However, note that <such behaviors
+      are no longer encouraged for general use cases>.
+
+    []
 
 * Writing a Simple Yarn Application
 
 ** Writing a simple Client
 
-  * The first step that a client needs to do is to connect to the 
-    ResourceManager or to be more specific, the ApplicationsManager (AsM) 
-    interface of the ResourceManager. 
-
-+---+
-    ApplicationClientProtocol applicationsManager; 
-    YarnConfiguration yarnConf = new YarnConfiguration(conf);
-    InetSocketAddress rmAddress = 
-        NetUtils.createSocketAddr(yarnConf.get(
-            YarnConfiguration.RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_ADDRESS));		
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    configuration appsManagerServerConf = new Configuration(conf);
-    appsManagerServerConf.setClass(
-        YarnConfiguration.YARN_SECURITY_INFO,
-        ClientRMSecurityInfo.class, SecurityInfo.class);
-    applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
-        ApplicationClientProtocol.class, rmAddress, appsManagerServerConf));    
-+---+
-
-  * Once a handle is obtained to the ASM, the client needs to request the 
-    ResourceManager for a new ApplicationId. 
-
-+---+
-    GetNewApplicationRequest request = 
-        Records.newRecord(GetNewApplicationRequest.class);		
-    GetNewApplicationResponse response = 
-        applicationsManager.getNewApplication(request);
-    LOG.info("Got new ApplicationId=" + response.getApplicationId());
-+---+
-
-  * The response from the ASM for a new application also contains information 
-    about the cluster such as the minimum/maximum resource capabilities of the 
-    cluster. This is required so that to ensure that you can correctly set the 
-    specifications of the container in which the ApplicationMaster would be 
-    launched. Please refer to GetNewApplicationResponse for more details. 
-
-  * The main crux of a client is to setup the ApplicationSubmissionContext 
-    which defines all the information needed by the ResourceManager to launch 
-    the ApplicationMaster. A client needs to set the following into the context: 
-    
-    * Application Info: id, name
+  * The first step that a client needs to do is to initialize and start a
+    YarnClient.
 
-    * Queue, Priority info: Queue to which the application will be submitted, 
-      the priority to be assigned for the application. 
-
-    * User: The user submitting the application 
++---+
+  YarnClient yarnClient = YarnClient.createYarnClient();
+  yarnClient.init(conf);
+  yarnClient.start();
++---+
 
-    * ContainerLaunchContext: The information defining the container in which 
-      the ApplicationMaster will be launched and run. The 
-      ContainerLaunchContext, as mentioned previously, defines all the required
-      information needed to run the ApplicationMaster such as the local 
-      resources (binaries, jars, files etc.), security tokens, environment 
-      settings (CLASSPATH etc.) and the command to be executed. 
-       
-    []   
-
-+---+
-    // Create a new ApplicationSubmissionContext
-    ApplicationSubmissionContext appContext = 
-        Records.newRecord(ApplicationSubmissionContext.class);
-    // set the ApplicationId 
-    appContext.setApplicationId(appId);
-    // set the application name
-    appContext.setApplicationName(appName);
-    
-    // Create a new container launch context for the AM's container
-    ContainerLaunchContext amContainer = 
-        Records.newRecord(ContainerLaunchContext.class);
-
-    // Define the local resources required 
-    Map<String, LocalResource> localResources = 
-        new HashMap<String, LocalResource>();
-    // Lets assume the jar we need for our ApplicationMaster is available in 
-    // HDFS at a certain known path to us and we want to make it available to
-    // the ApplicationMaster in the launched container 
-    Path jarPath; // <- known path to jar file  
-    FileStatus jarStatus = fs.getFileStatus(jarPath);
-    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
-    // Set the type of resource - file or archive
-    // archives are untarred at the destination by the framework
-    amJarRsrc.setType(LocalResourceType.FILE);
-    // Set visibility of the resource 
-    // Setting to most private option i.e. this file will only 
-    // be visible to this instance of the running application
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
-    // Set the location of resource to be copied over into the 
-    // working directory
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); 
-    // Set timestamp and length of file so that the framework 
-    // can do basic sanity checks for the local resource 
-    // after it has been copied over to ensure it is the same 
-    // resource the client intended to use with the application
-    amJarRsrc.setTimestamp(jarStatus.getModificationTime());
-    amJarRsrc.setSize(jarStatus.getLen());
-    // The framework will create a symlink called AppMaster.jar in the 
-    // working directory that will be linked back to the actual file. 
-    // The ApplicationMaster, if needs to reference the jar file, would 
-    // need to use the symlink filename.  
-    localResources.put("AppMaster.jar",  amJarRsrc);    
-    // Set the local resources into the launch context    
-    amContainer.setLocalResources(localResources);
-
-    // Set up the environment needed for the launch context
-    Map<String, String> env = new HashMap<String, String>();    
-    // For example, we could setup the classpath needed.
-    // Assuming our classes or jars are available as local resources in the
-    // working directory from which the command will be run, we need to append
-    // "." to the path. 
-    // By default, all the hadoop specific classpaths will already be available 
-    // in $CLASSPATH, so we should be careful not to overwrite it.   
-    String classPathEnv = "$CLASSPATH:./*:";    
-    env.put("CLASSPATH", classPathEnv);
-    amContainer.setEnvironment(env);
-    
-    // Construct the command to be executed on the launched container 
-    String command = 
-        "${JAVA_HOME}" + /bin/java" +
-        " MyAppMaster" + 
-        " arg1 arg2 arg3" + 
-        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";                     
-
-    List<String> commands = new ArrayList<String>();
-    commands.add(command);
-    // add additional commands if needed		
+  * Once a client is set up, the client needs to create an application, and get
+    its application id.
 
-    // Set the command array into the container spec
-    amContainer.setCommands(commands);
-    
-    // Define the resource requirements for the container
-    // For now, YARN only supports memory so we set the memory 
-    // requirements. 
-    // If the process takes more than its allocated memory, it will 
-    // be killed by the framework. 
-    // Memory being requested for should be less than max capability 
-    // of the cluster and all asks should be a multiple of the min capability. 
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(amMemory);
-    amContainer.setResource(capability);
-    
-    // Set the container launch content into the ApplicationSubmissionContext
-    appContext.setAMContainerSpec(amContainer);
++---+
+  YarnClientApplication app = yarnClient.createApplication();
+  GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
 +---+
 
-  * After the setup process is complete, the client is finally ready to submit 
-    the application to the ASM.  
-     
-+---+
-    // Create the request to send to the ApplicationsManager 
-    SubmitApplicationRequest appRequest = 
-        Records.newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-
-    // Submit the application to the ApplicationsManager
-    // Ignore the response as either a valid response object is returned on 
-    // success or an exception thrown to denote the failure
-    applicationsManager.submitApplication(appRequest);
-+---+    
-   
-  * At this point, the ResourceManager will have accepted the application and 
-    in the background, will go through the process of allocating a container 
-    with the required specifications and then eventually setting up and 
-    launching the ApplicationMaster on the allocated container. 
-    
-  * There are multiple ways a client can track progress of the actual task. 
-  
-    * It can communicate with the ResourceManager and request for a report of 
-      the application via ApplicationClientProtocol#getApplicationReport. 
+  * The response from the <<<YarnClientApplication>>> for a new application also
+    contains information about the cluster such as the minimum/maximum resource
+    capabilities of the cluster. This is required so that to ensure that you can
+    correctly set the specifications of the container in which the
+    ApplicationMaster would be launched. Please refer to
+    <<<GetNewApplicationResponse>>> for more details.
+
+  * The main crux of a client is to setup the <<<ApplicationSubmissionContext>>>
+    which defines all the information needed by the RM to launch the AM. A client
+    needs to set the following into the context:
+
+    * Application info: id, name
+
+    * Queue, priority info: Queue to which the application will be submitted,
+      the priority to be assigned for the application.
+
+    * User: The user submitting the application
+
+    * <<<ContainerLaunchContext>>>: The information defining the container in
+      which the AM will be launched and run. The <<<ContainerLaunchContext>>>, as
+      mentioned previously, defines all the required information needed to run
+      the application such as the local <<R>>esources (binaries, jars, files
+      etc.), <<E>>nvironment settings (CLASSPATH etc.), the <<C>>ommand to be
+      executed and security <<T>>okens (<RECT>).
+
+    []
 
-+-----+     
-      GetApplicationReportRequest reportRequest = 
-          Records.newRecord(GetApplicationReportRequest.class);
-      reportRequest.setApplicationId(appId);
-      GetApplicationReportResponse reportResponse = 
-          applicationsManager.getApplicationReport(reportRequest);
-      ApplicationReport report = reportResponse.getApplicationReport();
-+-----+             
-  
-      The ApplicationReport received from the ResourceManager consists of the following: 
-      
-        * General application information: ApplicationId, queue to which the 
-          application was submitted, user who submitted the application and the 
-          start time for the application. 
-          
-        * ApplicationMaster details: the host on which the ApplicationMaster is 
-          running, the rpc port (if any) on which it is listening for requests 
-          from clients and a token that the client needs to communicate with 
-          the ApplicationMaster. 
-          
-        * Application tracking information: If the application supports some 
-          form of progress tracking, it can set a tracking url which is 
-          available via ApplicationReport#getTrackingUrl that a client can look 
-          at to monitor progress. 
-          
-        * ApplicationStatus: The state of the application as seen by the 
-          ResourceManager is available via 
-          ApplicationReport#getYarnApplicationState. If the 
-          YarnApplicationState is set to FINISHED, the client should refer to 
-          ApplicationReport#getFinalApplicationStatus to check for the actual 
-          success/failure of the application task itself. In case of failures, 
-          ApplicationReport#getDiagnostics may be useful to shed some more 
-          light on the the failure.      
- 
-    * If the ApplicationMaster supports it, a client can directly query the 
-      ApplicationMaster itself for progress updates via the host:rpcport 
-      information obtained from the ApplicationReport. It can also use the 
-      tracking url obtained from the report if available.
-
-  * In certain situations, if the application is taking too long or due to 
-    other factors, the client may wish to kill the application. The 
-    ApplicationClientProtocol supports the forceKillApplication call that allows a 
-    client to send a kill signal to the ApplicationMaster via the 
-    ResourceManager. An ApplicationMaster if so designed may also support an 
-    abort call via its rpc layer that a client may be able to leverage.
-
-+---+
-    KillApplicationRequest killRequest = 
-        Records.newRecord(KillApplicationRequest.class);		
-    killRequest.setApplicationId(appId);
-    applicationsManager.forceKillApplication(killRequest);	
-+---+
-
-** Writing an ApplicationMaster
-
-  * The ApplicationMaster is the actual owner of the job. It will be launched 
-    by the ResourceManager and via the client will be provided all the necessary 
-    information and resources about the job that it has been tasked with to 
-    oversee and complete.  
-
-  * As the ApplicationMaster is launched within a container that may (likely 
-    will) be sharing a physical host with other containers, given the 
-    multi-tenancy nature, amongst other issues, it cannot make any assumptions 
-    of things like pre-configured ports that it can listen on. 
-  
-  * When the ApplicationMaster starts up, several parameters are made available
-    to it via the environment. These include the ContainerId for the
-    ApplicationMaster container, the application submission time and details
-    about the NodeManager host running the Application Master.
-    Ref ApplicationConstants for parameter names.
-
-  * All interactions with the ResourceManager require an ApplicationAttemptId 
-    (there can be multiple attempts per application in case of failures). The 
-    ApplicationAttemptId can be obtained from the ApplicationMaster
-    containerId. There are helper apis to convert the value obtained from the
-    environment into objects.
-    
 +---+
-    Map<String, String> envs = System.getenv();
-    String containerIdString = 
-        envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
-    if (containerIdString == null) {
-      // container id should always be set in the env by the framework 
-      throw new IllegalArgumentException(
-          "ContainerId not set in the environment");
+  // set the application submission context
+  ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+  ApplicationId appId = appContext.getApplicationId();
+
+  appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
+  appContext.setApplicationName(appName);
+
+  // set local resources for the application master
+  // local files or archives as needed
+  // In this scenario, the jar file for the application master is part of the local resources
+  Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+  LOG.info("Copy App Master jar from local filesystem and add to local environment");
+  // Copy the application master jar to the filesystem
+  // Create a local resource to point to the destination jar path
+  FileSystem fs = FileSystem.get(conf);
+  addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
+      localResources, null);
+
+  // Set the log4j properties if needed
+  if (!log4jPropFile.isEmpty()) {
+    addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
+        localResources, null);
+  }
+
+  // The shell script has to be made available on the final container(s)
+  // where it will be executed.
+  // To do this, we need to first copy into the filesystem that is visible
+  // to the yarn framework.
+  // We do not need to set this as a local resource for the application
+  // master as the application master does not need it.
+  String hdfsShellScriptLocation = "";
+  long hdfsShellScriptLen = 0;
+  long hdfsShellScriptTimestamp = 0;
+  if (!shellScriptPath.isEmpty()) {
+    Path shellSrc = new Path(shellScriptPath);
+    String shellPathSuffix =
+        appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
+    Path shellDst =
+        new Path(fs.getHomeDirectory(), shellPathSuffix);
+    fs.copyFromLocalFile(false, true, shellSrc, shellDst);
+    hdfsShellScriptLocation = shellDst.toUri().toString();
+    FileStatus shellFileStatus = fs.getFileStatus(shellDst);
+    hdfsShellScriptLen = shellFileStatus.getLen();
+    hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
+  }
+
+  if (!shellCommand.isEmpty()) {
+    addToLocalResources(fs, null, shellCommandPath, appId.toString(),
+        localResources, shellCommand);
+  }
+
+  if (shellArgs.length > 0) {
+    addToLocalResources(fs, null, shellArgsPath, appId.toString(),
+        localResources, StringUtils.join(shellArgs, " "));
+  }
+
+  // Set the env variables to be setup in the env where the application master will be run
+  LOG.info("Set the environment for the application master");
+  Map<String, String> env = new HashMap<String, String>();
+
+  // put location of shell script into env
+  // using the env info, the application master will create the correct local resource for the
+  // eventual containers that will be launched to execute the shell scripts
+  env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
+  env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
+  env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
+
+  // Add AppMaster.jar location to classpath
+  // At some point we should not be required to add
+  // the hadoop specific classpaths to the env.
+  // It should be provided out of the box.
+  // For now setting all required classpaths including
+  // the classpath to "." for the application jar
+  StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
+    .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
+  for (String c : conf.getStrings(
+      YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+      YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
+    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
+    classPathEnv.append(c.trim());
+  }
+  classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
+    "./log4j.properties");
+
+  // Set the necessary command to execute the application master
+  Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+  // Set java executable command
+  LOG.info("Setting up app master command");
+  vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
+  // Set Xmx based on am memory size
+  vargs.add("-Xmx" + amMemory + "m");
+  // Set class name
+  vargs.add(appMasterMainClass);
+  // Set params for Application Master
+  vargs.add("--container_memory " + String.valueOf(containerMemory));
+  vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
+  vargs.add("--num_containers " + String.valueOf(numContainers));
+  vargs.add("--priority " + String.valueOf(shellCmdPriority));
+
+  for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
+    vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
+  }
+  if (debugFlag) {
+    vargs.add("--debug");
+  }
+
+  vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+  vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+  // Get final commmand
+  StringBuilder command = new StringBuilder();
+  for (CharSequence str : vargs) {
+    command.append(str).append(" ");
+  }
+
+  LOG.info("Completed setting up app master command " + command.toString());
+  List<String> commands = new ArrayList<String>();
+  commands.add(command.toString());
+
+  // Set up the container launch context for the application master
+  ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
+    localResources, env, commands, null, null, null);
+
+  // Set up resource type requirements
+  // For now, both memory and vcores are supported, so we set memory and
+  // vcores requirements
+  Resource capability = Resource.newInstance(amMemory, amVCores);
+  appContext.setResource(capability);
+
+  // Service data is a binary blob that can be passed to the application
+  // Not needed in this scenario
+  // amContainer.setServiceData(serviceData);
+
+  // Setup security tokens
+  if (UserGroupInformation.isSecurityEnabled()) {
+    // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
+    Credentials credentials = new Credentials();
+    String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
+    if (tokenRenewer == null || tokenRenewer.length() == 0) {
+      throw new IOException(
+        "Can't get Master Kerberos principal for the RM to use as renewer");
     }
-    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
-    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
-+---+     
-  
-  * After an ApplicationMaster has initialized itself completely, it needs to 
-    register with the ResourceManager via 
-    ApplicationMasterProtocol#registerApplicationMaster. The ApplicationMaster always 
-    communicate via the Scheduler interface of the ResourceManager. 
-  
+
+    // For now, only getting tokens for the default file-system.
+    final Token<?> tokens[] =
+        fs.addDelegationTokens(tokenRenewer, credentials);
+    if (tokens != null) {
+      for (Token<?> token : tokens) {
+        LOG.info("Got dt for " + fs.getUri() + "; " + token);
+      }
+    }
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    amContainer.setTokens(fsTokens);
+  }
+
+  appContext.setAMContainerSpec(amContainer);
 +---+
-    // Connect to the Scheduler of the ResourceManager. 
-    YarnConfiguration yarnConf = new YarnConfiguration(conf);
-    InetSocketAddress rmAddress = 
-        NetUtils.createSocketAddr(yarnConf.get(
-            YarnConfiguration.RM_SCHEDULER_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));		
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationMasterProtocol resourceManager = 
-        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
-
-    // Register the AM with the RM
-    // Set the required info into the registration request: 
-    // ApplicationAttemptId, 
-    // host on which the app master is running
-    // rpc port on which the app master accepts requests from the client 
-    // tracking url for the client to track app master progress
-    RegisterApplicationMasterRequest appMasterRequest = 
-        Records.newRecord(RegisterApplicationMasterRequest.class);
-    appMasterRequest.setApplicationAttemptId(appAttemptID);	
-    appMasterRequest.setHost(appMasterHostname);
-    appMasterRequest.setRpcPort(appMasterRpcPort);
-    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-
-    // The registration response is useful as it provides information about the 
-    // cluster. 
-    // Similar to the GetNewApplicationResponse in the client, it provides 
-    // information about the min/mx resource capabilities of the cluster that 
-    // would be needed by the ApplicationMaster when requesting for containers.
-    RegisterApplicationMasterResponse response = 
-        resourceManager.registerApplicationMaster(appMasterRequest);
-+---+
-     
-  * The ApplicationMaster has to emit heartbeats to the ResourceManager to keep 
-    it informed that the ApplicationMaster is alive and still running. The 
-    timeout expiry interval at the ResourceManager is defined by a config 
-    setting accessible via YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS with the 
-    default being defined by YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS. 
-    The ApplicationMasterProtocol#allocate calls to the ResourceManager count as heartbeats 
-    as it also supports sending progress update information. Therefore, an 
-    allocate call with no containers requested and progress information updated 
-    if any is a valid way for making heartbeat calls to the ResourceManager. 
-    
-  * Based on the task requirements, the ApplicationMaster can ask for a set of 
-    containers to run its tasks on. The ApplicationMaster has to use the 
-    ResourceRequest class to define the following container specifications: 
-    
-    * Hostname: If containers are required to be hosted on a particular rack or 
-      a specific host. '*' is a special value that implies any host will do. 
-      
-    * Resource capability: Currently, YARN only supports memory based resource 
-      requirements so the request should define how much memory is needed. The 
-      value is defined in MB and has to less than the max capability of the 
-      cluster and an exact multiple of the min capability. Memory resources
-      correspond to physical memory limits imposed on the task containers.
-      
-    * Priority: When asking for sets of containers, an ApplicationMaster may 
-      define different priorities to each set. For example, the Map-Reduce 
-      ApplicationMaster may assign a higher priority to containers needed 
-      for the Map tasks and a lower priority for the Reduce tasks' containers.
-      
-    []     
-       
-+----+ 
-    // Resource Request
-    ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
-    // setup requirements for hosts 
-    // whether a particular rack/host is needed 
-    // useful for applications that are sensitive
-    // to data locality 
-    rsrcRequest.setHostName("*");
 
-    // set the priority for the request
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(requestPriority);
-    rsrcRequest.setPriority(pri);	    
+  * After the setup process is complete, the client is ready to submit
+    the application with specified priority and queue.
 
-    // Set up resource type requirements
-    // For now, only memory is supported so we set memory requirements
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(containerMemory);
-    rsrcRequest.setCapability(capability);
-
-    // set no. of containers needed
-    // matching the specifications
-    rsrcRequest.setNumContainers(numContainers);
-+---+
-        
-  * After defining the container requirements, the ApplicationMaster has to 
-    construct an AllocateRequest to send to the ResourceManager. 
-    The AllocateRequest consists of:
-        
-    * Requested containers: The container specifications and the no. of 
-      containers being requested for by the ApplicationMaster from the 
-      ResourceManager. 
-    
-    * Released containers: There may be situations when the ApplicationMaster 
-      may have requested for more containers that it needs or due to failure 
-      issues, decide to use other containers allocated to it. In all such 
-      situations, it is beneficial to the cluster if the ApplicationMaster 
-      releases these containers back to the ResourceManager so that they can be 
-      re-allocated to other applications.   
-    
-    * ResponseId: The response id that will be sent back in the response from 
-      the allocate call.  
-     
-    * Progress update information: The ApplicationMaster can send its progress 
-      update (range between to 0 to 1) to the ResourceManager. 
-    
-    []
-    
 +---+
-    List<ResourceRequest> requestedContainers;
-    List<ContainerId> releasedContainers    
-    AllocateRequest req = Records.newRecord(AllocateRequest.class);
-
-    // The response id set in the request will be sent back in 
-    // the response so that the ApplicationMaster can 
-    // match it to its original ask and act appropriately.
-    req.setResponseId(rmRequestID);
-    
-    // Set ApplicationAttemptId 
-    req.setApplicationAttemptId(appAttemptID);
-    
-    // Add the list of containers being asked for 
-    req.addAllAsks(requestedContainers);
-    
-    // If the ApplicationMaster has no need for certain 
-    // containers due to over-allocation or for any other
-    // reason, it can release them back to the ResourceManager
-    req.addAllReleases(releasedContainers);
-    
-    // Assuming the ApplicationMaster can track its progress
-    req.setProgress(currentProgress);
-    
-    AllocateResponse allocateResponse = resourceManager.allocate(req);		     
+  // Set the priority for the application master
+  Priority pri = Priority.newInstance(amPriority);
+  appContext.setPriority(pri);
+
+  // Set the queue to which this application is to be submitted in the RM
+  appContext.setQueue(amQueue);
+
+  // Submit the application to the applications manager
+  // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
+
+  yarnClient.submitApplication(appContext);
 +---+
-    
-  * The AllocateResponse sent back from the ResourceManager provides the 
-    following information:
-  
-    * Reboot flag: For scenarios when the ApplicationMaster may get out of sync 
-      with the ResourceManager. 
-    
-    * Allocated containers: The containers that have been allocated to the 
-      ApplicationMaster.
-    
-    * Headroom: Headroom for resources in the cluster. Based on this information 
-      and knowing its needs, an ApplicationMaster can make intelligent decisions 
-      such as re-prioritizing sub-tasks to take advantage of currently allocated 
-      containers, bailing out faster if resources are not becoming available 
-      etc.         
-    
-    * Completed containers: Once an ApplicationMaster triggers a launch an 
-      allocated container, it will receive an update from the ResourceManager 
-      when the container completes. The ApplicationMaster can look into the 
-      status of the completed container and take appropriate actions such as 
-      re-trying a particular sub-task in case of a failure.
-
-    * Number of cluster nodes: The number of hosts available on the cluster.
-      
-    [] 
-      
-    One thing to note is that containers will not be immediately allocated to 
-    the ApplicationMaster. This does not imply that the ApplicationMaster should 
-    keep on asking the pending count of required containers. Once an allocate 
-    request has been sent, the ApplicationMaster will eventually be allocated 
-    the containers based on cluster capacity, priorities and the scheduling 
-    policy in place. The ApplicationMaster should only request for containers 
-    again if and only if its original estimate changed and it needs additional 
-    containers. 
 
+  * At this point, the RM will have accepted the application and in the
+    background, will go through the process of allocating a container with the
+    required specifications and then eventually setting up and launching the AM
+    on the allocated container.
+
+  * There are multiple ways a client can track progress of the actual task.
+
+    * It can communicate with the RM and request for a report of the application
+      via the <<<getApplicationReport()>>> method of <<<YarnClient>>>.
+
++-----+
+  // Get application report for the appId we are interested in
+  ApplicationReport report = yarnClient.getApplicationReport(appId);
++-----+
+
+      The <<<ApplicationReport>>> received from the RM consists of the following:
+
+        * General application information: Application id, queue to which the
+          application was submitted, user who submitted the application and the
+          start time for the application.
+
+        * ApplicationMaster details: the host on which the AM is running, the
+          rpc port (if any) on which it is listening for requests from clients
+          and a token that the client needs to communicate with the AM.
+
+        * Application tracking information: If the application supports some form
+          of progress tracking, it can set a tracking url which is available via
+          <<<ApplicationReport>>>'s <<<getTrackingUrl()>>> method that a client
+          can look at to monitor progress.
+
+        * Application status: The state of the application as seen by the
+          ResourceManager is available via
+          <<<ApplicationReport#getYarnApplicationState>>>. If the
+          <<<YarnApplicationState>>> is set to <<<FINISHED>>>, the client should
+          refer to <<<ApplicationReport#getFinalApplicationStatus>>> to check for
+          the actual success/failure of the application task itself. In case of
+          failures, <<<ApplicationReport#getDiagnostics>>> may be useful to shed
+          some more light on the the failure.
+
+    * If the ApplicationMaster supports it, a client can directly query the AM
+      itself for progress updates via the host:rpcport information obtained from
+      the application report. It can also use the tracking url obtained from the
+      report if available.
+
+  * In certain situations, if the application is taking too long or due to other
+    factors, the client may wish to kill the application. <<<YarnClient>>>
+    supports the <<<killApplication>>> call that allows a client to send a kill
+    signal to the AM via the ResourceManager. An ApplicationMaster if so
+    designed may also support an abort call via its rpc layer that a client may
+    be able to leverage.
+
++---+
+  yarnClient.killApplication(appId);
 +---+
 
-    // Retrieve list of allocated containers from the response 
-    // and on each allocated container, lets assume we are launching 
-    // the same job.
-    List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
+** Writing an ApplicationMaster (AM)
+
+  * The AM is the actual owner of the job. It will be launched
+    by the RM and via the client will be provided all the
+    necessary information and resources about the job that it has been tasked
+    with to oversee and complete.
+
+  * As the AM is launched within a container that may (likely
+    will) be sharing a physical host with other containers, given the
+    multi-tenancy nature, amongst other issues, it cannot make any assumptions
+    of things like pre-configured ports that it can listen on.
+
+  * When the AM starts up, several parameters are made available
+    to it via the environment. These include the <<<ContainerId>>> for the
+    AM container, the application submission time and details
+    about the NM (NodeManager) host running the ApplicationMaster.
+    Ref <<<ApplicationConstants>>> for parameter names.
+
+  * All interactions with the RM require an <<<ApplicationAttemptId>>> (there can
+    be multiple attempts per application in case of failures). The
+    <<<ApplicationAttemptId>>> can be obtained from the AM's container id. There
+    are helper APIs to convert the value obtained from the environment into
+    objects.
+
++---+
+  Map<String, String> envs = System.getenv();
+  String containerIdString =
+      envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+  if (containerIdString == null) {
+    // container id should always be set in the env by the framework
+    throw new IllegalArgumentException(
+        "ContainerId not set in the environment");
+  }
+  ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
+  ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
++---+
+
+  * After an AM has initialized itself completely, we can start the two clients:
+    one to ResourceManager, and one to NodeManagers. We set them up with our
+    customized event handler, and we will talk about those event handlers in
+    detail later in this article.
+
++---+
+  AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+  amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+  amRMClient.init(conf);
+  amRMClient.start();
+
+  containerListener = createNMCallbackHandler();
+  nmClientAsync = new NMClientAsyncImpl(containerListener);
+  nmClientAsync.init(conf);
+  nmClientAsync.start();
++---+
+
+  * The AM has to emit heartbeats to the RM to keep it informed that the AM is
+    alive and still running. The timeout expiry interval at the RM is defined by
+    a config setting accessible via
+    <<<YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS>>> with the default being
+    defined by <<<YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS>>>. The
+    ApplicationMaster needs to register itself with the ResourceManager to
+    start hearbeating.
+
++---+
+  // Register self with ResourceManager
+  // This will start heartbeating to the RM
+  appMasterHostname = NetUtils.getHostname();
+  RegisterApplicationMasterResponse response = amRMClient
+      .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+          appMasterTrackingUrl);
++---+
+
+  * In the response of the registration, maximum resource capability if included. You may want to use this to check the application's request.
+
++---+
+  // Dump out information about cluster capability as seen by the
+  // resource manager
+  int maxMem = response.getMaximumResourceCapability().getMemory();
+  LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+  int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
+  LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
+
+  // A resource ask cannot exceed the max.
+  if (containerMemory > maxMem) {
+    LOG.info("Container memory specified above max threshold of cluster."
+        + " Using max value." + ", specified=" + containerMemory + ", max="
+        + maxMem);
+    containerMemory = maxMem;
+  }
+
+  if (containerVirtualCores > maxVCores) {
+    LOG.info("Container virtual cores specified above max threshold of  cluster."
+      + " Using max value." + ", specified=" + containerVirtualCores + ", max="
+      + maxVCores);
+    containerVirtualCores = maxVCores;
+  }
+  List<Container> previousAMRunningContainers =
+      response.getContainersFromPreviousAttempts();
+  LOG.info("Received " + previousAMRunningContainers.size()
+          + " previous AM's running containers on AM registration.");
++---+
+
+  * Based on the task requirements, the AM can ask for a set of containers to run
+    its tasks on. We can now calculate how many containers we need, and request
+    those many containers.
+
++---+
+  List<Container> previousAMRunningContainers =
+      response.getContainersFromPreviousAttempts();
+  List<Container> previousAMRunningContainers =
+      response.getContainersFromPreviousAttempts();
+  LOG.info("Received " + previousAMRunningContainers.size()
+      + " previous AM's running containers on AM registration.");
+
+  int numTotalContainersToRequest =
+      numTotalContainers - previousAMRunningContainers.size();
+  // Setup ask for containers from RM
+  // Send request for containers to RM
+  // Until we get our fully allocated quota, we keep on polling RM for
+  // containers
+  // Keep looping until all the containers are launched and shell script
+  // executed on them ( regardless of success/failure).
+  for (int i = 0; i < numTotalContainersToRequest; ++i) {
+    ContainerRequest containerAsk = setupContainerAskForRM();
+    amRMClient.addContainerRequest(containerAsk);
+  }
++---+
+
+  * In <<<setupContainerAskForRM()>>>, the follow two things need some set up:
+
+    * Resource capability: Currently, YARN supports memory based resource
+      requirements so the request should define how much memory is needed. The
+      value is defined in MB and has to less than the max capability of the
+      cluster and an exact multiple of the min capability. Memory resources
+      correspond to physical memory limits imposed on the task containers. It
+      will also support computation based resource (vCore), as shown in the code.
+
+    * Priority: When asking for sets of containers, an AM may define different
+      priorities to each set. For example, the Map-Reduce AM may assign a higher
+      priority to containers needed for the Map tasks and a lower priority for
+      the Reduce tasks' containers.
+
+    []
+
++---+
+  private ContainerRequest setupContainerAskForRM() {
+    // setup requirements for hosts
+    // using * as any host will do for the distributed shell app
+    // set the priority for the request
+    Priority pri = Priority.newInstance(requestPriority);
+
+    // Set up resource type requirements
+    // For now, memory and CPU are supported so we set memory and cpu requirements
+    Resource capability = Resource.newInstance(containerMemory,
+      containerVirtualCores);
+
+    ContainerRequest request = new ContainerRequest(capability, null, null,
+        pri);
+    LOG.info("Requested container ask: " + request.toString());
+    return request;
+  }
++---+
+
+  * After container allocation requests have been sent by the application
+    manager, contailers will be launched asynchronously, by the event handler of
+    the <<<AMRMClientAsync>>> client. The handler should implement
+    <<<AMRMClientAsync.CallbackHandler>>> interface.
+
+    * When there are containers allocated, the handler sets up a thread that runs
+      the code to launch containers. Here we use the name
+      <<<LaunchContainerRunnable>>> to demonstrate. We will talk about the
+      <<<LaunchContainerRunnable>>> class in the following part of this article.
+
++---+
+  @Override
+  public void onContainersAllocated(List<Container> allocatedContainers) {
+    LOG.info("Got response from RM for container ask, allocatedCnt="
+        + allocatedContainers.size());
+    numAllocatedContainers.addAndGet(allocatedContainers.size());
     for (Container allocatedContainer : allocatedContainers) {
-      LOG.info("Launching shell command on a new container."
-          + ", containerId=" + allocatedContainer.getId()
-          + ", containerNode=" + allocatedContainer.getNodeId().getHost() 
-          + ":" + allocatedContainer.getNodeId().getPort()
-          + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
-          + ", containerState" + allocatedContainer.getState()
-          + ", containerResourceMemory"  
-          + allocatedContainer.getResource().getMemory());
-          
-          
-      // Launch and start the container on a separate thread to keep the main 
-      // thread unblocked as all containers may not be allocated at one go.
-      LaunchContainerRunnable runnableLaunchContainer = 
-          new LaunchContainerRunnable(allocatedContainer);
-      Thread launchThread = new Thread(runnableLaunchContainer);	
+      LaunchContainerRunnable runnableLaunchContainer =
+          new LaunchContainerRunnable(allocatedContainer, containerListener);
+      Thread launchThread = new Thread(runnableLaunchContainer);
+
+      // launch and start the container on a separate thread to keep
+      // the main thread unblocked
+      // as all containers may not be allocated at one go.
       launchThreads.add(launchThread);
       launchThread.start();
     }
+  }
++---+
 
-    // Check what the current available resources in the cluster are
-    Resource availableResources = allocateResponse.getAvailableResources();
-    // Based on this information, an ApplicationMaster can make appropriate 
-    // decisions
-
-    // Check the completed containers
-    // Let's assume we are keeping a count of total completed containers, 
-    // containers that failed and ones that completed successfully.  			
-    List<ContainerStatus> completedContainers = 
-        allocateResponse.getCompletedContainersStatuses();
-    for (ContainerStatus containerStatus : completedContainers) {				
-      LOG.info("Got container status for containerID= " 
-          + containerStatus.getContainerId()
-          + ", state=" + containerStatus.getState()	
-          + ", exitStatus=" + containerStatus.getExitStatus() 
-          + ", diagnostics=" + containerStatus.getDiagnostics());
-
-      int exitStatus = containerStatus.getExitStatus();
-      if (0 != exitStatus) {
-        // container failed 
-        // -100 is a special case where the container 
-        // was aborted/pre-empted for some reason 
-        if (-100 != exitStatus) {
-          // application job on container returned a non-zero exit code
-          // counts as completed 
-          numCompletedContainers.incrementAndGet();
-          numFailedContainers.incrementAndGet();							
-        }
-        else { 
-          // something else bad happened 
-          // app job did not complete for some reason 
-          // we should re-try as the container was lost for some reason
-          // decrementing the requested count so that we ask for an
-          // additional one in the next allocate call.          
-          numRequestedContainers.decrementAndGet();
-          // we do not need to release the container as that has already 
-          // been done by the ResourceManager/NodeManager. 
-        }
-        }
-        else { 
-          // nothing to do 
-          // container completed successfully 
-          numCompletedContainers.incrementAndGet();
-          numSuccessfulContainers.incrementAndGet();
-        }
-      }
-    }
-+---+      
+    * On heart beat, the event handler reports the progress of the application.
 
-    
-  * After a container has been allocated to the ApplicationMaster, it needs to 
-    follow a similar process that the Client followed in setting up the 
-    ContainerLaunchContext for the eventual task that is going to be running on 
-    the allocated Container. Once the ContainerLaunchContext is defined, the 
-    ApplicationMaster can then communicate with the ContainerManager to start 
-    its allocated container.
-       
-+---+
-       
-    //Assuming an allocated Container obtained from AllocateResponse
-    Container container;   
-    // Connect to ContainerManager on the allocated container 
-    String cmIpPortStr = container.getNodeId().getHost() + ":" 
-        + container.getNodeId().getPort();		
-    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);		
-    ContainerManager cm = 
-        (ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf);     
-
-    // Now we setup a ContainerLaunchContext  
-    ContainerLaunchContext ctx = 
-        Records.newRecord(ContainerLaunchContext.class);
-
-    ctx.setContainerId(container.getId());
-    ctx.setResource(container.getResource());
-
-    try {
-      ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-    } catch (IOException e) {
-      LOG.info(
-          "Getting current user failed when trying to launch the container",
-          + e.getMessage());
-    }
++---+
+  @Override
+  public float getProgress() {
+    // set progress to deliver to RM on next heartbeat
+    float progress = (float) numCompletedContainers.get()
+        / numTotalContainers;
+    return progress;
+  }
++---+
 
-    // Set the environment 
-    Map<String, String> unixEnv;
-    // Setup the required env. 
-    // Please note that the launched container does not inherit 
-    // the environment of the ApplicationMaster so all the 
-    // necessary environment settings will need to be re-setup 
-    // for this allocated container.      
-    ctx.setEnvironment(unixEnv);
-
-    // Set the local resources 
-    Map<String, LocalResource> localResources = 
-        new HashMap<String, LocalResource>();
-    // Again, the local resources from the ApplicationMaster is not copied over 
-    // by default to the allocated container. Thus, it is the responsibility 
- 	  // of the ApplicationMaster to setup all the necessary local resources 
- 	  // needed by the job that will be executed on the allocated container. 
-      
-    // Assume that we are executing a shell script on the allocated container 
-    // and the shell script's location in the filesystem is known to us. 
-    Path shellScriptPath; 
-    LocalResource shellRsrc = Records.newRecord(LocalResource.class);
-    shellRsrc.setType(LocalResourceType.FILE);
-    shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
-    shellRsrc.setResource(
-        ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
-    shellRsrc.setTimestamp(shellScriptPathTimestamp);
-    shellRsrc.setSize(shellScriptPathLen);
-    localResources.put("MyExecShell.sh", shellRsrc);
-
-    ctx.setLocalResources(localResources);			
-
-    // Set the necessary command to execute on the allocated container 
-    String command = "/bin/sh ./MyExecShell.sh"
-        + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
-        + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
-
-    List<String> commands = new ArrayList<String>();
-    commands.add(command);
-    ctx.setCommands(commands);
-
-    // Send the start request to the ContainerManager
-    StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
-    startReq.setContainerLaunchContext(ctx);
-    cm.startContainer(startReq);
-+---+                
-      
-  * The ApplicationMaster, as mentioned previously, will get updates of 
-    completed containers as part of the response from the ApplicationMasterProtocol#allocate 
-    calls. It can also monitor its launched containers pro-actively by querying 
-    the ContainerManager for the status. 
-    
+    []
+
+  * The container launch thread actually launches the containers on NMs. After a
+    container has been allocated to the AM, it needs to follow a similar process
+    that the client followed in setting up the <<<ContainerLaunchContext>>> for
+    the eventual task that is going to be running on the allocated Container.
+    Once the <<<ContainerLaunchContext>>> is defined, the AM can start it through
+    the <<<NMClientAsync>>>.
+
++---+
+  // Set the necessary command to execute on the allocated container
+  Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+  // Set executable command
+  vargs.add(shellCommand);
+  // Set shell script path
+  if (!scriptPath.isEmpty()) {
+    vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
+      : ExecShellStringPath);
+  }
+
+  // Set args for the shell command if any
+  vargs.add(shellArgs);
+  // Add log redirect params
+  vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+  vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+  // Get final commmand
+  StringBuilder command = new StringBuilder();
+  for (CharSequence str : vargs) {
+    command.append(str).append(" ");
+  }
+
+  List<String> commands = new ArrayList<String>();
+  commands.add(command.toString());
+
+  // Set up ContainerLaunchContext, setting local resource, environment,
+  // command and token for constructor.
+
+  // Note for tokens: Set up tokens for the container too. Today, for normal
+  // shell commands, the container in distribute-shell doesn't need any
+  // tokens. We are populating them mainly for NodeManagers to be able to
+  // download anyfiles in the distributed file-system. The tokens are
+  // otherwise also useful in cases, for e.g., when one is running a
+  // "hadoop dfs" command inside the distributed shell.
+  ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
+    localResources, shellEnv, commands, null, allTokens.duplicate(), null);
+  containerListener.addContainer(container.getId(), container);
+  nmClientAsync.startContainerAsync(container, ctx);
 +---+
 
-    GetContainerStatusRequest statusReq = 
-        Records.newRecord(GetContainerStatusRequest.class);
-    statusReq.setContainerId(container.getId());
-    GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
-    LOG.info("Container Status"
-        + ", id=" + container.getId()
-        + ", status=" + statusResp.getStatus());
-+---+      
+  * The <<<NMClientAsync>>> object, together with its event handler, handles container events. Including container start, stop, status update, and occurs an error.
+  
+  * After the ApplicationMaster determines the work is done, it needs to unregister itself through the AM-RM client, and then stops the client. 
+
++---+
+  try {
+    amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+  } catch (YarnException ex) {
+    LOG.error("Failed to unregister application", ex);
+  } catch (IOException e) {
+    LOG.error("Failed to unregister application", e);
+  }
+  
+  amRMClient.stop();
++---+
 
 ~~** Defining the context in which your code runs
 
-~~*** Container Resource Requests 
+~~*** Container Resource Requests
 
-~~*** Local Resources 
+~~*** Local Resources
 
-~~*** Environment 
+~~*** Environment
 
-~~**** Managing the CLASSPATH 
+~~**** Managing the CLASSPATH
 
-~~** Security 
+~~** Security
 
-* FAQ 
+* FAQ
 
-** How can I distribute my application's jars to all of the nodes in the YARN 
+** How can I distribute my application's jars to all of the nodes in the YARN
    cluster that need it?
 
-  You can use the LocalResource to add resources to your application request. 
-  This will cause YARN to distribute the resource to the ApplicationMaster node. 
-  If the resource is a tgz, zip, or jar - you can have YARN unzip it. Then, all 
-  you need to do is add the unzipped folder to your classpath. 
-  For example, when creating your application request:
-
-+---+
-    File packageFile = new File(packagePath);
-    Url packageUrl = ConverterUtils.getYarnUrlFromPath(
-        FileContext.getFileContext.makeQualified(new Path(packagePath)));
-
-    packageResource.setResource(packageUrl);
-    packageResource.setSize(packageFile.length());
-    packageResource.setTimestamp(packageFile.lastModified());
-    packageResource.setType(LocalResourceType.ARCHIVE);
-    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
-    resource.setMemory(memory)
-    containerCtx.setResource(resource)
-    containerCtx.setCommands(ImmutableList.of(
-        "java -cp './package/*' some.class.to.Run "
-        + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
-        + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"))
-    containerCtx.setLocalResources(
-        Collections.singletonMap("package", packageResource))
-    appCtx.setApplicationId(appId)
-    appCtx.setUser(user.getShortUserName)
-    appCtx.setAMContainerSpec(containerCtx)
-    request.setApplicationSubmissionContext(appCtx)
-    applicationsManager.submitApplication(request)
-+---+
-
-  As you can see, the setLocalResources command takes a map of names to 
-  resources. The name becomes a sym link in your application's cwd, so you can 
-  just refer to the artifacts inside by using ./package/*. 
-  
-  Note: Java's classpath (cp) argument is VERY sensitive. 
+  * You can use the LocalResource to add resources to your application request.
+    This will cause YARN to distribute the resource to the ApplicationMaster
+    node. If the resource is a tgz, zip, or jar - you can have YARN unzip it.
+    Then, all you need to do is add the unzipped folder to your classpath. For
+    example, when creating your application request:
+
++---+
+  File packageFile = new File(packagePath);
+  Url packageUrl = ConverterUtils.getYarnUrlFromPath(
+      FileContext.getFileContext.makeQualified(new Path(packagePath)));
+
+  packageResource.setResource(packageUrl);
+  packageResource.setSize(packageFile.length());
+  packageResource.setTimestamp(packageFile.lastModified());
+  packageResource.setType(LocalResourceType.ARCHIVE);
+  packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+  resource.setMemory(memory);
+  containerCtx.setResource(resource);
+  containerCtx.setCommands(ImmutableList.of(
+      "java -cp './package/*' some.class.to.Run "
+      + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
+      + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
+  containerCtx.setLocalResources(
+      Collections.singletonMap("package", packageResource));
+  appCtx.setApplicationId(appId);
+  appCtx.setUser(user.getShortUserName);
+  appCtx.setAMContainerSpec(containerCtx);
+  yarnClient.submitApplication(appCtx);
++---+
+
+  As you can see, the <<<setLocalResources>>> command takes a map of names to
+  resources. The name becomes a sym link in your application's cwd, so you can
+  just refer to the artifacts inside by using ./package/*.
+
+  Note: Java's classpath (cp) argument is VERY sensitive.
   Make sure you get the syntax EXACTLY correct.
 
-  Once your package is distributed to your ApplicationMaster, you'll need to 
-  follow the same process whenever your ApplicationMaster starts a new container 
-  (assuming you want the resources to be sent to your container). The code for 
-  this is the same. You just need to make sure that you give your 
-  ApplicationMaster the package path (either HDFS, or local), so that it can 
-  send the resource URL along with the container ctx.
-
-** How do I get the ApplicationMaster's ApplicationAttemptId? 
-
-
-  The ApplicationAttemptId will be passed to the ApplicationMaster via the 
-  environment and the value from the environment can be converted into an 
-  ApplicationAttemptId object via the ConverterUtils helper function.
-
-** My container is being killed by the Node Manager
-
-  This is likely due to high memory usage exceeding your requested container 
-  memory size. There are a number of reasons that can cause this. First, look 
-  at the process tree that the node manager dumps when it kills your container. 
-  The two things you're interested in are physical memory and virtual memory. 
-  If you have exceeded physical memory limits your app is using too much physical 
-  memory. If you're running a Java app, you can use -hprof to look at what is 
-  taking up space in the heap. If you have exceeded virtual memory, you may
-  need to increase the value of the the cluster-wide configuration variable
-  <<<yarn.nodemanager.vmem-pmem-ratio>>>.
+  Once your package is distributed to your AM, you'll need to follow the same
+  process whenever your AM starts a new container (assuming you want the
+  resources to be sent to your container). The code for this is the same. You
+  just need to make sure that you give your AM the package path (either HDFS, or
+  local), so that it can send the resource URL along with the container ctx.
+
+** How do I get the ApplicationMaster's <<<ApplicationAttemptId>>>?
+
+  * The <<<ApplicationAttemptId>>> will be passed to the AM via the environment
+    and the value from the environment can be converted into an
+    <<<ApplicationAttemptId>>> object via the ConverterUtils helper function.
+
+** Why my container is killed by the NodeManager?
+
+  * This is likely due to high memory usage exceeding your requested container
+    memory size. There are a number of reasons that can cause this. First, look
+    at the process tree that the NodeManager dumps when it kills your container.
+    The two things you're interested in are physical memory and virtual memory.
+    If you have exceeded physical memory limits your app is using too much
+    physical memory. If you're running a Java app, you can use -hprof to look at
+    what is taking up space in the heap. If you have exceeded virtual memory, you
+    may need to increase the value of the the cluster-wide configuration variable
+    <<<yarn.nodemanager.vmem-pmem-ratio>>>.
 
 ** How do I include native libraries?
 
-
-  Setting -Djava.library.path on the command line while launching a container 
-  can cause native libraries used by Hadoop to not be loaded correctly and can
-  result in errors. It is cleaner to use LD_LIBRARY_PATH instead.
+  * Setting <<<-Djava.library.path>>> on the command line while launching a
+    container can cause native libraries used by Hadoop to not be loaded
+    correctly and can result in errors. It is cleaner to use
+    <<<LD_LIBRARY_PATH>>> instead.
 
 * Useful Links
 
-  * {{{https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf}Map Reduce Next Generation Architecture}}
+  * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html}YARN Architecture}}
+
+  * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html}YARN Capacity Scheduler}}
+
+  * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html}YARN Fair Scheduler}}
+
+* Sample code
 
-  * {{{http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/}Map Reduce Next Generation Scheduler}}
+  * Yarn distributed shell: in <<<hadoop-yarn-applications-distributedshell>>>
+    project after you set up your development environment.