You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [26/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm Tue Aug 19 23:49:39 2014
@@ -11,784 +11,747 @@
~~ limitations under the License. See accompanying LICENSE file.
---
- Hadoop Map Reduce Next Generation-${project.version} - Writing YARN
- Applications
+ Hadoop Map Reduce Next Generation-${project.version} - Writing YARN
+ Applications
---
---
${maven.build.timestamp}
Hadoop MapReduce Next Generation - Writing YARN Applications
- \[ {{{./index.html}Go Back}} \]
-
%{toc|section=1|fromDepth=0}
-* Purpose
+* Purpose
- This document describes, at a high-level, the way to implement new
+ This document describes, at a high-level, the way to implement new
Applications for YARN.
* Concepts and Flow
- The general concept is that an 'Application Submission Client' submits an
- 'Application' to the YARN Resource Manager. The client communicates with the
- ResourceManager using the 'ApplicationClientProtocol' to first acquire a new
- 'ApplicationId' if needed via ApplicationClientProtocol#getNewApplication and then
- submit the 'Application' to be run via ApplicationClientProtocol#submitApplication. As
- part of the ApplicationClientProtocol#submitApplication call, the client needs to
- provide sufficient information to the ResourceManager to 'launch' the
- application's first container i.e. the ApplicationMaster.
- You need to provide information such as the details about the local
- files/jars that need to be available for your application to run, the actual
- command that needs to be executed (with the necessary command line arguments),
- any Unix environment settings (optional), etc. Effectively, you need to
- describe the Unix process(es) that needs to be launched for your
- ApplicationMaster.
-
- The YARN ResourceManager will then launch the ApplicationMaster (as specified)
- on an allocated container. The ApplicationMaster is then expected to
- communicate with the ResourceManager using the 'ApplicationMasterProtocol'. Firstly, the
- ApplicationMaster needs to register itself with the ResourceManager. To
- complete the task assigned to it, the ApplicationMaster can then request for
- and receive containers via ApplicationMasterProtocol#allocate. After a container is
- allocated to it, the ApplicationMaster communicates with the NodeManager using
- ContainerManager#startContainer to launch the container for its task. As part
- of launching this container, the ApplicationMaster has to specify the
- ContainerLaunchContext which, similar to the ApplicationSubmissionContext,
- has the launch information such as command line specification, environment,
- etc. Once the task is completed, the ApplicationMaster has to signal the
- ResourceManager of its completion via the ApplicationMasterProtocol#finishApplicationMaster.
-
- Meanwhile, the client can monitor the application's status by querying the
- ResourceManager or by directly querying the ApplicationMaster if it supports
- such a service. If needed, it can also kill the application via
- ApplicationClientProtocol#forceKillApplication.
+ The general concept is that an <application submission client> submits an
+ <application> to the YARN <ResourceManager> (RM). This can be done through
+ setting up a <<<YarnClient>>> object. After <<<YarnClient>>> is started, the
+ client can then set up application context, prepare the very first container of
+ the application that contains the <ApplicationMaster> (AM), and then submit
+ the application. You need to provide information such as the details about the
+ local files/jars that need to be available for your application to run, the
+ actual command that needs to be executed (with the necessary command line
+ arguments), any OS environment settings (optional), etc. Effectively, you
+ need to describe the Unix process(es) that needs to be launched for your
+ ApplicationMaster.
+
+ The YARN ResourceManager will then launch the ApplicationMaster (as
+ specified) on an allocated container. The ApplicationMaster communicates with
+ YARN cluster, and handles application execution. It performs operations in an
+ asynchronous fashion. During application launch time, the main tasks of the
+ ApplicationMaster are: a) communicating with the ResourceManager to negotiate
+ and allocate resources for future containers, and b) after container
+ allocation, communicating YARN <NodeManager>s (NMs) to launch application
+ containers on them. Task a) can be performed asynchronously through an
+ <<<AMRMClientAsync>>> object, with event handling methods specified in a
+ <<<AMRMClientAsync.CallbackHandler>>> type of event handler. The event handler
+ needs to be set to the client explicitly. Task b) can be performed by launching
+ a runnable object that then launches containers when there are containers
+ allocated. As part of launching this container, the AM has to
+ specify the <<<ContainerLaunchContext>>> that has the launch information such as
+ command line specification, environment, etc.
+
+ During the execution of an application, the ApplicationMaster communicates
+ NodeManagers through <<<NMClientAsync>>> object. All container events are
+ handled by <<<NMClientAsync.CallbackHandler>>>, associated with
+ <<<NMClientAsync>>>. A typical callback handler handles client start, stop,
+ status update and error. ApplicationMaster also reports execution progress to
+ ResourceManager by handling the <<<getProgress()>>> method of
+ <<<AMRMClientAsync.CallbackHandler>>>.
+
+ Other than asynchronous clients, there are synchronous versions for certain
+ workflows (<<<AMRMClient>>> and <<<NMClient>>>). The asynchronous clients are
+ recommended because of (subjectively) simpler usages, and this article
+ will mainly cover the asynchronous clients. Please refer to <<<AMRMClient>>>
+ and <<<NMClient>>> for more information on synchronous clients.
-* Interfaces
+* Interfaces
The interfaces you'd most like be concerned with are:
- * ApplicationClientProtocol - Client\<--\>ResourceManager\
- The protocol for a client that wishes to communicate with the
- ResourceManager to launch a new application (i.e. the ApplicationMaster),
- check on the status of the application or kill the application. For example,
- a job-client (a job launching program from the gateway) would use this
- protocol.
+ * <<Client>>\<--\><<ResourceManager>>\
+ By using <<<YarnClient>>> objects.
+
+ * <<ApplicationMaster>>\<--\><<ResourceManager>>\
+ By using <<<AMRMClientAsync>>> objects, handling events asynchronously by
+ <<<AMRMClientAsync.CallbackHandler>>>
+
+ * <<ApplicationMaster>>\<--\><<NodeManager>>\
+ Launch containers. Communicate with NodeManagers
+ by using <<<NMClientAsync>>> objects, handling container events by
+ <<<NMClientAsync.CallbackHandler>>>
+
+ []
+
+ <<Note>>
- * ApplicationMasterProtocol - ApplicationMaster\<--\>ResourceManager\
- The protocol used by the ApplicationMaster to register/unregister itself
- to/from the ResourceManager as well as to request for resources from the
- Scheduler to complete its tasks.
-
- * ContainerManager - ApplicationMaster\<--\>NodeManager\
- The protocol used by the ApplicationMaster to talk to the NodeManager to
- start/stop containers and get status updates on the containers if needed.
+ * The three main protocols for YARN application (ApplicationClientProtocol,
+ ApplicationMasterProtocol and ContainerManagementProtocol) are still
+ preserved. The 3 clients wrap these 3 protocols to provide simpler
+ programming model for YARN applications.
+
+ * Under very rare circumstances, programmer may want to directly use the 3
+ protocols to implement an application. However, note that <such behaviors
+ are no longer encouraged for general use cases>.
+
+ []
* Writing a Simple Yarn Application
** Writing a simple Client
- * The first step that a client needs to do is to connect to the
- ResourceManager or to be more specific, the ApplicationsManager (AsM)
- interface of the ResourceManager.
-
-+---+
- ApplicationClientProtocol applicationsManager;
- YarnConfiguration yarnConf = new YarnConfiguration(conf);
- InetSocketAddress rmAddress =
- NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS));
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- configuration appsManagerServerConf = new Configuration(conf);
- appsManagerServerConf.setClass(
- YarnConfiguration.YARN_SECURITY_INFO,
- ClientRMSecurityInfo.class, SecurityInfo.class);
- applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
- ApplicationClientProtocol.class, rmAddress, appsManagerServerConf));
-+---+
-
- * Once a handle is obtained to the ASM, the client needs to request the
- ResourceManager for a new ApplicationId.
-
-+---+
- GetNewApplicationRequest request =
- Records.newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse response =
- applicationsManager.getNewApplication(request);
- LOG.info("Got new ApplicationId=" + response.getApplicationId());
-+---+
-
- * The response from the ASM for a new application also contains information
- about the cluster such as the minimum/maximum resource capabilities of the
- cluster. This is required so that to ensure that you can correctly set the
- specifications of the container in which the ApplicationMaster would be
- launched. Please refer to GetNewApplicationResponse for more details.
-
- * The main crux of a client is to setup the ApplicationSubmissionContext
- which defines all the information needed by the ResourceManager to launch
- the ApplicationMaster. A client needs to set the following into the context:
-
- * Application Info: id, name
-
- * Queue, Priority info: Queue to which the application will be submitted,
- the priority to be assigned for the application.
+ * The first step that a client needs to do is to initialize and start a
+ YarnClient.
- * User: The user submitting the application
++---+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
++---+
- * ContainerLaunchContext: The information defining the container in which
- the ApplicationMaster will be launched and run. The
- ContainerLaunchContext, as mentioned previously, defines all the required
- information needed to run the ApplicationMaster such as the local
- resources (binaries, jars, files etc.), security tokens, environment
- settings (CLASSPATH etc.) and the command to be executed.
-
- []
-
-+---+
- // Create a new ApplicationSubmissionContext
- ApplicationSubmissionContext appContext =
- Records.newRecord(ApplicationSubmissionContext.class);
- // set the ApplicationId
- appContext.setApplicationId(appId);
- // set the application name
- appContext.setApplicationName(appName);
-
- // Create a new container launch context for the AM's container
- ContainerLaunchContext amContainer =
- Records.newRecord(ContainerLaunchContext.class);
-
- // Define the local resources required
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
- // Lets assume the jar we need for our ApplicationMaster is available in
- // HDFS at a certain known path to us and we want to make it available to
- // the ApplicationMaster in the launched container
- Path jarPath; // <- known path to jar file
- FileStatus jarStatus = fs.getFileStatus(jarPath);
- LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
- // Set the type of resource - file or archive
- // archives are untarred at the destination by the framework
- amJarRsrc.setType(LocalResourceType.FILE);
- // Set visibility of the resource
- // Setting to most private option i.e. this file will only
- // be visible to this instance of the running application
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- // Set the location of resource to be copied over into the
- // working directory
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- // Set timestamp and length of file so that the framework
- // can do basic sanity checks for the local resource
- // after it has been copied over to ensure it is the same
- // resource the client intended to use with the application
- amJarRsrc.setTimestamp(jarStatus.getModificationTime());
- amJarRsrc.setSize(jarStatus.getLen());
- // The framework will create a symlink called AppMaster.jar in the
- // working directory that will be linked back to the actual file.
- // The ApplicationMaster, if needs to reference the jar file, would
- // need to use the symlink filename.
- localResources.put("AppMaster.jar", amJarRsrc);
- // Set the local resources into the launch context
- amContainer.setLocalResources(localResources);
-
- // Set up the environment needed for the launch context
- Map<String, String> env = new HashMap<String, String>();
- // For example, we could setup the classpath needed.
- // Assuming our classes or jars are available as local resources in the
- // working directory from which the command will be run, we need to append
- // "." to the path.
- // By default, all the hadoop specific classpaths will already be available
- // in $CLASSPATH, so we should be careful not to overwrite it.
- String classPathEnv = "$CLASSPATH:./*:";
- env.put("CLASSPATH", classPathEnv);
- amContainer.setEnvironment(env);
-
- // Construct the command to be executed on the launched container
- String command =
- "${JAVA_HOME}" + /bin/java" +
- " MyAppMaster" +
- " arg1 arg2 arg3" +
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
-
- List<String> commands = new ArrayList<String>();
- commands.add(command);
- // add additional commands if needed
+ * Once a client is set up, the client needs to create an application, and get
+ its application id.
- // Set the command array into the container spec
- amContainer.setCommands(commands);
-
- // Define the resource requirements for the container
- // For now, YARN only supports memory so we set the memory
- // requirements.
- // If the process takes more than its allocated memory, it will
- // be killed by the framework.
- // Memory being requested for should be less than max capability
- // of the cluster and all asks should be a multiple of the min capability.
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(amMemory);
- amContainer.setResource(capability);
-
- // Set the container launch content into the ApplicationSubmissionContext
- appContext.setAMContainerSpec(amContainer);
++---+
+ YarnClientApplication app = yarnClient.createApplication();
+ GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+---+
- * After the setup process is complete, the client is finally ready to submit
- the application to the ASM.
-
-+---+
- // Create the request to send to the ApplicationsManager
- SubmitApplicationRequest appRequest =
- Records.newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
-
- // Submit the application to the ApplicationsManager
- // Ignore the response as either a valid response object is returned on
- // success or an exception thrown to denote the failure
- applicationsManager.submitApplication(appRequest);
-+---+
-
- * At this point, the ResourceManager will have accepted the application and
- in the background, will go through the process of allocating a container
- with the required specifications and then eventually setting up and
- launching the ApplicationMaster on the allocated container.
-
- * There are multiple ways a client can track progress of the actual task.
-
- * It can communicate with the ResourceManager and request for a report of
- the application via ApplicationClientProtocol#getApplicationReport.
+ * The response from the <<<YarnClientApplication>>> for a new application also
+ contains information about the cluster such as the minimum/maximum resource
+ capabilities of the cluster. This is required so that to ensure that you can
+ correctly set the specifications of the container in which the
+ ApplicationMaster would be launched. Please refer to
+ <<<GetNewApplicationResponse>>> for more details.
+
+ * The main crux of a client is to setup the <<<ApplicationSubmissionContext>>>
+ which defines all the information needed by the RM to launch the AM. A client
+ needs to set the following into the context:
+
+ * Application info: id, name
+
+ * Queue, priority info: Queue to which the application will be submitted,
+ the priority to be assigned for the application.
+
+ * User: The user submitting the application
+
+ * <<<ContainerLaunchContext>>>: The information defining the container in
+ which the AM will be launched and run. The <<<ContainerLaunchContext>>>, as
+ mentioned previously, defines all the required information needed to run
+ the application such as the local <<R>>esources (binaries, jars, files
+ etc.), <<E>>nvironment settings (CLASSPATH etc.), the <<C>>ommand to be
+ executed and security <<T>>okens (<RECT>).
+
+ []
-+-----+
- GetApplicationReportRequest reportRequest =
- Records.newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(appId);
- GetApplicationReportResponse reportResponse =
- applicationsManager.getApplicationReport(reportRequest);
- ApplicationReport report = reportResponse.getApplicationReport();
-+-----+
-
- The ApplicationReport received from the ResourceManager consists of the following:
-
- * General application information: ApplicationId, queue to which the
- application was submitted, user who submitted the application and the
- start time for the application.
-
- * ApplicationMaster details: the host on which the ApplicationMaster is
- running, the rpc port (if any) on which it is listening for requests
- from clients and a token that the client needs to communicate with
- the ApplicationMaster.
-
- * Application tracking information: If the application supports some
- form of progress tracking, it can set a tracking url which is
- available via ApplicationReport#getTrackingUrl that a client can look
- at to monitor progress.
-
- * ApplicationStatus: The state of the application as seen by the
- ResourceManager is available via
- ApplicationReport#getYarnApplicationState. If the
- YarnApplicationState is set to FINISHED, the client should refer to
- ApplicationReport#getFinalApplicationStatus to check for the actual
- success/failure of the application task itself. In case of failures,
- ApplicationReport#getDiagnostics may be useful to shed some more
- light on the the failure.
-
- * If the ApplicationMaster supports it, a client can directly query the
- ApplicationMaster itself for progress updates via the host:rpcport
- information obtained from the ApplicationReport. It can also use the
- tracking url obtained from the report if available.
-
- * In certain situations, if the application is taking too long or due to
- other factors, the client may wish to kill the application. The
- ApplicationClientProtocol supports the forceKillApplication call that allows a
- client to send a kill signal to the ApplicationMaster via the
- ResourceManager. An ApplicationMaster if so designed may also support an
- abort call via its rpc layer that a client may be able to leverage.
-
-+---+
- KillApplicationRequest killRequest =
- Records.newRecord(KillApplicationRequest.class);
- killRequest.setApplicationId(appId);
- applicationsManager.forceKillApplication(killRequest);
-+---+
-
-** Writing an ApplicationMaster
-
- * The ApplicationMaster is the actual owner of the job. It will be launched
- by the ResourceManager and via the client will be provided all the necessary
- information and resources about the job that it has been tasked with to
- oversee and complete.
-
- * As the ApplicationMaster is launched within a container that may (likely
- will) be sharing a physical host with other containers, given the
- multi-tenancy nature, amongst other issues, it cannot make any assumptions
- of things like pre-configured ports that it can listen on.
-
- * When the ApplicationMaster starts up, several parameters are made available
- to it via the environment. These include the ContainerId for the
- ApplicationMaster container, the application submission time and details
- about the NodeManager host running the Application Master.
- Ref ApplicationConstants for parameter names.
-
- * All interactions with the ResourceManager require an ApplicationAttemptId
- (there can be multiple attempts per application in case of failures). The
- ApplicationAttemptId can be obtained from the ApplicationMaster
- containerId. There are helper apis to convert the value obtained from the
- environment into objects.
-
+---+
- Map<String, String> envs = System.getenv();
- String containerIdString =
- envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
- if (containerIdString == null) {
- // container id should always be set in the env by the framework
- throw new IllegalArgumentException(
- "ContainerId not set in the environment");
+ // set the application submission context
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+ ApplicationId appId = appContext.getApplicationId();
+
+ appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
+ appContext.setApplicationName(appName);
+
+ // set local resources for the application master
+ // local files or archives as needed
+ // In this scenario, the jar file for the application master is part of the local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ LOG.info("Copy App Master jar from local filesystem and add to local environment");
+ // Copy the application master jar to the filesystem
+ // Create a local resource to point to the destination jar path
+ FileSystem fs = FileSystem.get(conf);
+ addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
+ localResources, null);
+
+ // Set the log4j properties if needed
+ if (!log4jPropFile.isEmpty()) {
+ addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
+ localResources, null);
+ }
+
+ // The shell script has to be made available on the final container(s)
+ // where it will be executed.
+ // To do this, we need to first copy into the filesystem that is visible
+ // to the yarn framework.
+ // We do not need to set this as a local resource for the application
+ // master as the application master does not need it.
+ String hdfsShellScriptLocation = "";
+ long hdfsShellScriptLen = 0;
+ long hdfsShellScriptTimestamp = 0;
+ if (!shellScriptPath.isEmpty()) {
+ Path shellSrc = new Path(shellScriptPath);
+ String shellPathSuffix =
+ appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
+ Path shellDst =
+ new Path(fs.getHomeDirectory(), shellPathSuffix);
+ fs.copyFromLocalFile(false, true, shellSrc, shellDst);
+ hdfsShellScriptLocation = shellDst.toUri().toString();
+ FileStatus shellFileStatus = fs.getFileStatus(shellDst);
+ hdfsShellScriptLen = shellFileStatus.getLen();
+ hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
+ }
+
+ if (!shellCommand.isEmpty()) {
+ addToLocalResources(fs, null, shellCommandPath, appId.toString(),
+ localResources, shellCommand);
+ }
+
+ if (shellArgs.length > 0) {
+ addToLocalResources(fs, null, shellArgsPath, appId.toString(),
+ localResources, StringUtils.join(shellArgs, " "));
+ }
+
+ // Set the env variables to be setup in the env where the application master will be run
+ LOG.info("Set the environment for the application master");
+ Map<String, String> env = new HashMap<String, String>();
+
+ // put location of shell script into env
+ // using the env info, the application master will create the correct local resource for the
+ // eventual containers that will be launched to execute the shell scripts
+ env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
+ env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
+ env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
+
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
+ .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
+ "./log4j.properties");
+
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ LOG.info("Setting up app master command");
+ vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx" + amMemory + "m");
+ // Set class name
+ vargs.add(appMasterMainClass);
+ // Set params for Application Master
+ vargs.add("--container_memory " + String.valueOf(containerMemory));
+ vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
+ vargs.add("--num_containers " + String.valueOf(numContainers));
+ vargs.add("--priority " + String.valueOf(shellCmdPriority));
+
+ for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
+ vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
+ }
+ if (debugFlag) {
+ vargs.add("--debug");
+ }
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up app master command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
+ localResources, env, commands, null, null, null);
+
+ // Set up resource type requirements
+ // For now, both memory and vcores are supported, so we set memory and
+ // vcores requirements
+ Resource capability = Resource.newInstance(amMemory, amVCores);
+ appContext.setResource(capability);
+
+ // Service data is a binary blob that can be passed to the application
+ // Not needed in this scenario
+ // amContainer.setServiceData(serviceData);
+
+ // Setup security tokens
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
+ Credentials credentials = new Credentials();
+ String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
+ if (tokenRenewer == null || tokenRenewer.length() == 0) {
+ throw new IOException(
+ "Can't get Master Kerberos principal for the RM to use as renewer");
}
- ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
- ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
-+---+
-
- * After an ApplicationMaster has initialized itself completely, it needs to
- register with the ResourceManager via
- ApplicationMasterProtocol#registerApplicationMaster. The ApplicationMaster always
- communicate via the Scheduler interface of the ResourceManager.
-
+
+ // For now, only getting tokens for the default file-system.
+ final Token<?> tokens[] =
+ fs.addDelegationTokens(tokenRenewer, credentials);
+ if (tokens != null) {
+ for (Token<?> token : tokens) {
+ LOG.info("Got dt for " + fs.getUri() + "; " + token);
+ }
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ amContainer.setTokens(fsTokens);
+ }
+
+ appContext.setAMContainerSpec(amContainer);
+---+
- // Connect to the Scheduler of the ResourceManager.
- YarnConfiguration yarnConf = new YarnConfiguration(conf);
- InetSocketAddress rmAddress =
- NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- ApplicationMasterProtocol resourceManager =
- (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
-
- // Register the AM with the RM
- // Set the required info into the registration request:
- // ApplicationAttemptId,
- // host on which the app master is running
- // rpc port on which the app master accepts requests from the client
- // tracking url for the client to track app master progress
- RegisterApplicationMasterRequest appMasterRequest =
- Records.newRecord(RegisterApplicationMasterRequest.class);
- appMasterRequest.setApplicationAttemptId(appAttemptID);
- appMasterRequest.setHost(appMasterHostname);
- appMasterRequest.setRpcPort(appMasterRpcPort);
- appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-
- // The registration response is useful as it provides information about the
- // cluster.
- // Similar to the GetNewApplicationResponse in the client, it provides
- // information about the min/mx resource capabilities of the cluster that
- // would be needed by the ApplicationMaster when requesting for containers.
- RegisterApplicationMasterResponse response =
- resourceManager.registerApplicationMaster(appMasterRequest);
-+---+
-
- * The ApplicationMaster has to emit heartbeats to the ResourceManager to keep
- it informed that the ApplicationMaster is alive and still running. The
- timeout expiry interval at the ResourceManager is defined by a config
- setting accessible via YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS with the
- default being defined by YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS.
- The ApplicationMasterProtocol#allocate calls to the ResourceManager count as heartbeats
- as it also supports sending progress update information. Therefore, an
- allocate call with no containers requested and progress information updated
- if any is a valid way for making heartbeat calls to the ResourceManager.
-
- * Based on the task requirements, the ApplicationMaster can ask for a set of
- containers to run its tasks on. The ApplicationMaster has to use the
- ResourceRequest class to define the following container specifications:
-
- * Hostname: If containers are required to be hosted on a particular rack or
- a specific host. '*' is a special value that implies any host will do.
-
- * Resource capability: Currently, YARN only supports memory based resource
- requirements so the request should define how much memory is needed. The
- value is defined in MB and has to less than the max capability of the
- cluster and an exact multiple of the min capability. Memory resources
- correspond to physical memory limits imposed on the task containers.
-
- * Priority: When asking for sets of containers, an ApplicationMaster may
- define different priorities to each set. For example, the Map-Reduce
- ApplicationMaster may assign a higher priority to containers needed
- for the Map tasks and a lower priority for the Reduce tasks' containers.
-
- []
-
-+----+
- // Resource Request
- ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
- // setup requirements for hosts
- // whether a particular rack/host is needed
- // useful for applications that are sensitive
- // to data locality
- rsrcRequest.setHostName("*");
- // set the priority for the request
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(requestPriority);
- rsrcRequest.setPriority(pri);
+ * After the setup process is complete, the client is ready to submit
+ the application with specified priority and queue.
- // Set up resource type requirements
- // For now, only memory is supported so we set memory requirements
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(containerMemory);
- rsrcRequest.setCapability(capability);
-
- // set no. of containers needed
- // matching the specifications
- rsrcRequest.setNumContainers(numContainers);
-+---+
-
- * After defining the container requirements, the ApplicationMaster has to
- construct an AllocateRequest to send to the ResourceManager.
- The AllocateRequest consists of:
-
- * Requested containers: The container specifications and the no. of
- containers being requested for by the ApplicationMaster from the
- ResourceManager.
-
- * Released containers: There may be situations when the ApplicationMaster
- may have requested for more containers that it needs or due to failure
- issues, decide to use other containers allocated to it. In all such
- situations, it is beneficial to the cluster if the ApplicationMaster
- releases these containers back to the ResourceManager so that they can be
- re-allocated to other applications.
-
- * ResponseId: The response id that will be sent back in the response from
- the allocate call.
-
- * Progress update information: The ApplicationMaster can send its progress
- update (range between to 0 to 1) to the ResourceManager.
-
- []
-
+---+
- List<ResourceRequest> requestedContainers;
- List<ContainerId> releasedContainers
- AllocateRequest req = Records.newRecord(AllocateRequest.class);
-
- // The response id set in the request will be sent back in
- // the response so that the ApplicationMaster can
- // match it to its original ask and act appropriately.
- req.setResponseId(rmRequestID);
-
- // Set ApplicationAttemptId
- req.setApplicationAttemptId(appAttemptID);
-
- // Add the list of containers being asked for
- req.addAllAsks(requestedContainers);
-
- // If the ApplicationMaster has no need for certain
- // containers due to over-allocation or for any other
- // reason, it can release them back to the ResourceManager
- req.addAllReleases(releasedContainers);
-
- // Assuming the ApplicationMaster can track its progress
- req.setProgress(currentProgress);
-
- AllocateResponse allocateResponse = resourceManager.allocate(req);
+ // Set the priority for the application master
+ Priority pri = Priority.newInstance(amPriority);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(amQueue);
+
+ // Submit the application to the applications manager
+ // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
+
+ yarnClient.submitApplication(appContext);
+---+
-
- * The AllocateResponse sent back from the ResourceManager provides the
- following information:
-
- * Reboot flag: For scenarios when the ApplicationMaster may get out of sync
- with the ResourceManager.
-
- * Allocated containers: The containers that have been allocated to the
- ApplicationMaster.
-
- * Headroom: Headroom for resources in the cluster. Based on this information
- and knowing its needs, an ApplicationMaster can make intelligent decisions
- such as re-prioritizing sub-tasks to take advantage of currently allocated
- containers, bailing out faster if resources are not becoming available
- etc.
-
- * Completed containers: Once an ApplicationMaster triggers a launch an
- allocated container, it will receive an update from the ResourceManager
- when the container completes. The ApplicationMaster can look into the
- status of the completed container and take appropriate actions such as
- re-trying a particular sub-task in case of a failure.
-
- * Number of cluster nodes: The number of hosts available on the cluster.
-
- []
-
- One thing to note is that containers will not be immediately allocated to
- the ApplicationMaster. This does not imply that the ApplicationMaster should
- keep on asking the pending count of required containers. Once an allocate
- request has been sent, the ApplicationMaster will eventually be allocated
- the containers based on cluster capacity, priorities and the scheduling
- policy in place. The ApplicationMaster should only request for containers
- again if and only if its original estimate changed and it needs additional
- containers.
+ * At this point, the RM will have accepted the application and in the
+ background, will go through the process of allocating a container with the
+ required specifications and then eventually setting up and launching the AM
+ on the allocated container.
+
+ * There are multiple ways a client can track progress of the actual task.
+
+ * It can communicate with the RM and request for a report of the application
+ via the <<<getApplicationReport()>>> method of <<<YarnClient>>>.
+
++-----+
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
++-----+
+
+ The <<<ApplicationReport>>> received from the RM consists of the following:
+
+ * General application information: Application id, queue to which the
+ application was submitted, user who submitted the application and the
+ start time for the application.
+
+ * ApplicationMaster details: the host on which the AM is running, the
+ rpc port (if any) on which it is listening for requests from clients
+ and a token that the client needs to communicate with the AM.
+
+ * Application tracking information: If the application supports some form
+ of progress tracking, it can set a tracking url which is available via
+ <<<ApplicationReport>>>'s <<<getTrackingUrl()>>> method that a client
+ can look at to monitor progress.
+
+ * Application status: The state of the application as seen by the
+ ResourceManager is available via
+ <<<ApplicationReport#getYarnApplicationState>>>. If the
+ <<<YarnApplicationState>>> is set to <<<FINISHED>>>, the client should
+ refer to <<<ApplicationReport#getFinalApplicationStatus>>> to check for
+ the actual success/failure of the application task itself. In case of
+ failures, <<<ApplicationReport#getDiagnostics>>> may be useful to shed
+ some more light on the the failure.
+
+ * If the ApplicationMaster supports it, a client can directly query the AM
+ itself for progress updates via the host:rpcport information obtained from
+ the application report. It can also use the tracking url obtained from the
+ report if available.
+
+ * In certain situations, if the application is taking too long or due to other
+ factors, the client may wish to kill the application. <<<YarnClient>>>
+ supports the <<<killApplication>>> call that allows a client to send a kill
+ signal to the AM via the ResourceManager. An ApplicationMaster if so
+ designed may also support an abort call via its rpc layer that a client may
+ be able to leverage.
+
++---+
+ yarnClient.killApplication(appId);
+---+
- // Retrieve list of allocated containers from the response
- // and on each allocated container, lets assume we are launching
- // the same job.
- List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
+** Writing an ApplicationMaster (AM)
+
+ * The AM is the actual owner of the job. It will be launched
+ by the RM and via the client will be provided all the
+ necessary information and resources about the job that it has been tasked
+ with to oversee and complete.
+
+ * As the AM is launched within a container that may (likely
+ will) be sharing a physical host with other containers, given the
+ multi-tenancy nature, amongst other issues, it cannot make any assumptions
+ of things like pre-configured ports that it can listen on.
+
+ * When the AM starts up, several parameters are made available
+ to it via the environment. These include the <<<ContainerId>>> for the
+ AM container, the application submission time and details
+ about the NM (NodeManager) host running the ApplicationMaster.
+ Ref <<<ApplicationConstants>>> for parameter names.
+
+ * All interactions with the RM require an <<<ApplicationAttemptId>>> (there can
+ be multiple attempts per application in case of failures). The
+ <<<ApplicationAttemptId>>> can be obtained from the AM's container id. There
+ are helper APIs to convert the value obtained from the environment into
+ objects.
+
++---+
+ Map<String, String> envs = System.getenv();
+ String containerIdString =
+ envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ if (containerIdString == null) {
+ // container id should always be set in the env by the framework
+ throw new IllegalArgumentException(
+ "ContainerId not set in the environment");
+ }
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
+ ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
++---+
+
+ * After an AM has initialized itself completely, we can start the two clients:
+ one to ResourceManager, and one to NodeManagers. We set them up with our
+ customized event handler, and we will talk about those event handlers in
+ detail later in this article.
+
++---+
+ AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+ amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+ amRMClient.init(conf);
+ amRMClient.start();
+
+ containerListener = createNMCallbackHandler();
+ nmClientAsync = new NMClientAsyncImpl(containerListener);
+ nmClientAsync.init(conf);
+ nmClientAsync.start();
++---+
+
+ * The AM has to emit heartbeats to the RM to keep it informed that the AM is
+ alive and still running. The timeout expiry interval at the RM is defined by
+ a config setting accessible via
+ <<<YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS>>> with the default being
+ defined by <<<YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS>>>. The
+ ApplicationMaster needs to register itself with the ResourceManager to
+ start hearbeating.
+
++---+
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ appMasterHostname = NetUtils.getHostname();
+ RegisterApplicationMasterResponse response = amRMClient
+ .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ appMasterTrackingUrl);
++---+
+
+ * In the response of the registration, maximum resource capability if included. You may want to use this to check the application's request.
+
++---+
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
+ LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
+
+ // A resource ask cannot exceed the max.
+ if (containerMemory > maxMem) {
+ LOG.info("Container memory specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerMemory + ", max="
+ + maxMem);
+ containerMemory = maxMem;
+ }
+
+ if (containerVirtualCores > maxVCores) {
+ LOG.info("Container virtual cores specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ + maxVCores);
+ containerVirtualCores = maxVCores;
+ }
+ List<Container> previousAMRunningContainers =
+ response.getContainersFromPreviousAttempts();
+ LOG.info("Received " + previousAMRunningContainers.size()
+ + " previous AM's running containers on AM registration.");
++---+
+
+ * Based on the task requirements, the AM can ask for a set of containers to run
+ its tasks on. We can now calculate how many containers we need, and request
+ those many containers.
+
++---+
+ List<Container> previousAMRunningContainers =
+ response.getContainersFromPreviousAttempts();
+ List<Container> previousAMRunningContainers =
+ response.getContainersFromPreviousAttempts();
+ LOG.info("Received " + previousAMRunningContainers.size()
+ + " previous AM's running containers on AM registration.");
+
+ int numTotalContainersToRequest =
+ numTotalContainers - previousAMRunningContainers.size();
+ // Setup ask for containers from RM
+ // Send request for containers to RM
+ // Until we get our fully allocated quota, we keep on polling RM for
+ // containers
+ // Keep looping until all the containers are launched and shell script
+ // executed on them ( regardless of success/failure).
+ for (int i = 0; i < numTotalContainersToRequest; ++i) {
+ ContainerRequest containerAsk = setupContainerAskForRM();
+ amRMClient.addContainerRequest(containerAsk);
+ }
++---+
+
+ * In <<<setupContainerAskForRM()>>>, the follow two things need some set up:
+
+ * Resource capability: Currently, YARN supports memory based resource
+ requirements so the request should define how much memory is needed. The
+ value is defined in MB and has to less than the max capability of the
+ cluster and an exact multiple of the min capability. Memory resources
+ correspond to physical memory limits imposed on the task containers. It
+ will also support computation based resource (vCore), as shown in the code.
+
+ * Priority: When asking for sets of containers, an AM may define different
+ priorities to each set. For example, the Map-Reduce AM may assign a higher
+ priority to containers needed for the Map tasks and a lower priority for
+ the Reduce tasks' containers.
+
+ []
+
++---+
+ private ContainerRequest setupContainerAskForRM() {
+ // setup requirements for hosts
+ // using * as any host will do for the distributed shell app
+ // set the priority for the request
+ Priority pri = Priority.newInstance(requestPriority);
+
+ // Set up resource type requirements
+ // For now, memory and CPU are supported so we set memory and cpu requirements
+ Resource capability = Resource.newInstance(containerMemory,
+ containerVirtualCores);
+
+ ContainerRequest request = new ContainerRequest(capability, null, null,
+ pri);
+ LOG.info("Requested container ask: " + request.toString());
+ return request;
+ }
++---+
+
+ * After container allocation requests have been sent by the application
+ manager, contailers will be launched asynchronously, by the event handler of
+ the <<<AMRMClientAsync>>> client. The handler should implement
+ <<<AMRMClientAsync.CallbackHandler>>> interface.
+
+ * When there are containers allocated, the handler sets up a thread that runs
+ the code to launch containers. Here we use the name
+ <<<LaunchContainerRunnable>>> to demonstrate. We will talk about the
+ <<<LaunchContainerRunnable>>> class in the following part of this article.
+
++---+
+ @Override
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
- LOG.info("Launching shell command on a new container."
- + ", containerId=" + allocatedContainer.getId()
- + ", containerNode=" + allocatedContainer.getNodeId().getHost()
- + ":" + allocatedContainer.getNodeId().getPort()
- + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
- + ", containerState" + allocatedContainer.getState()
- + ", containerResourceMemory"
- + allocatedContainer.getResource().getMemory());
-
-
- // Launch and start the container on a separate thread to keep the main
- // thread unblocked as all containers may not be allocated at one go.
- LaunchContainerRunnable runnableLaunchContainer =
- new LaunchContainerRunnable(allocatedContainer);
- Thread launchThread = new Thread(runnableLaunchContainer);
+ LaunchContainerRunnable runnableLaunchContainer =
+ new LaunchContainerRunnable(allocatedContainer, containerListener);
+ Thread launchThread = new Thread(runnableLaunchContainer);
+
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchThread.start();
}
+ }
++---+
- // Check what the current available resources in the cluster are
- Resource availableResources = allocateResponse.getAvailableResources();
- // Based on this information, an ApplicationMaster can make appropriate
- // decisions
-
- // Check the completed containers
- // Let's assume we are keeping a count of total completed containers,
- // containers that failed and ones that completed successfully.
- List<ContainerStatus> completedContainers =
- allocateResponse.getCompletedContainersStatuses();
- for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Got container status for containerID= "
- + containerStatus.getContainerId()
- + ", state=" + containerStatus.getState()
- + ", exitStatus=" + containerStatus.getExitStatus()
- + ", diagnostics=" + containerStatus.getDiagnostics());
-
- int exitStatus = containerStatus.getExitStatus();
- if (0 != exitStatus) {
- // container failed
- // -100 is a special case where the container
- // was aborted/pre-empted for some reason
- if (-100 != exitStatus) {
- // application job on container returned a non-zero exit code
- // counts as completed
- numCompletedContainers.incrementAndGet();
- numFailedContainers.incrementAndGet();
- }
- else {
- // something else bad happened
- // app job did not complete for some reason
- // we should re-try as the container was lost for some reason
- // decrementing the requested count so that we ask for an
- // additional one in the next allocate call.
- numRequestedContainers.decrementAndGet();
- // we do not need to release the container as that has already
- // been done by the ResourceManager/NodeManager.
- }
- }
- else {
- // nothing to do
- // container completed successfully
- numCompletedContainers.incrementAndGet();
- numSuccessfulContainers.incrementAndGet();
- }
- }
- }
-+---+
+ * On heart beat, the event handler reports the progress of the application.
-
- * After a container has been allocated to the ApplicationMaster, it needs to
- follow a similar process that the Client followed in setting up the
- ContainerLaunchContext for the eventual task that is going to be running on
- the allocated Container. Once the ContainerLaunchContext is defined, the
- ApplicationMaster can then communicate with the ContainerManager to start
- its allocated container.
-
-+---+
-
- //Assuming an allocated Container obtained from AllocateResponse
- Container container;
- // Connect to ContainerManager on the allocated container
- String cmIpPortStr = container.getNodeId().getHost() + ":"
- + container.getNodeId().getPort();
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
- ContainerManager cm =
- (ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf);
-
- // Now we setup a ContainerLaunchContext
- ContainerLaunchContext ctx =
- Records.newRecord(ContainerLaunchContext.class);
-
- ctx.setContainerId(container.getId());
- ctx.setResource(container.getResource());
-
- try {
- ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
- } catch (IOException e) {
- LOG.info(
- "Getting current user failed when trying to launch the container",
- + e.getMessage());
- }
++---+
+ @Override
+ public float getProgress() {
+ // set progress to deliver to RM on next heartbeat
+ float progress = (float) numCompletedContainers.get()
+ / numTotalContainers;
+ return progress;
+ }
++---+
- // Set the environment
- Map<String, String> unixEnv;
- // Setup the required env.
- // Please note that the launched container does not inherit
- // the environment of the ApplicationMaster so all the
- // necessary environment settings will need to be re-setup
- // for this allocated container.
- ctx.setEnvironment(unixEnv);
-
- // Set the local resources
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
- // Again, the local resources from the ApplicationMaster is not copied over
- // by default to the allocated container. Thus, it is the responsibility
- // of the ApplicationMaster to setup all the necessary local resources
- // needed by the job that will be executed on the allocated container.
-
- // Assume that we are executing a shell script on the allocated container
- // and the shell script's location in the filesystem is known to us.
- Path shellScriptPath;
- LocalResource shellRsrc = Records.newRecord(LocalResource.class);
- shellRsrc.setType(LocalResourceType.FILE);
- shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- shellRsrc.setResource(
- ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
- shellRsrc.setTimestamp(shellScriptPathTimestamp);
- shellRsrc.setSize(shellScriptPathLen);
- localResources.put("MyExecShell.sh", shellRsrc);
-
- ctx.setLocalResources(localResources);
-
- // Set the necessary command to execute on the allocated container
- String command = "/bin/sh ./MyExecShell.sh"
- + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
- + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
-
- List<String> commands = new ArrayList<String>();
- commands.add(command);
- ctx.setCommands(commands);
-
- // Send the start request to the ContainerManager
- StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
- startReq.setContainerLaunchContext(ctx);
- cm.startContainer(startReq);
-+---+
-
- * The ApplicationMaster, as mentioned previously, will get updates of
- completed containers as part of the response from the ApplicationMasterProtocol#allocate
- calls. It can also monitor its launched containers pro-actively by querying
- the ContainerManager for the status.
-
+ []
+
+ * The container launch thread actually launches the containers on NMs. After a
+ container has been allocated to the AM, it needs to follow a similar process
+ that the client followed in setting up the <<<ContainerLaunchContext>>> for
+ the eventual task that is going to be running on the allocated Container.
+ Once the <<<ContainerLaunchContext>>> is defined, the AM can start it through
+ the <<<NMClientAsync>>>.
+
++---+
+ // Set the necessary command to execute on the allocated container
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+ // Set executable command
+ vargs.add(shellCommand);
+ // Set shell script path
+ if (!scriptPath.isEmpty()) {
+ vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
+ : ExecShellStringPath);
+ }
+
+ // Set args for the shell command if any
+ vargs.add(shellArgs);
+ // Add log redirect params
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ // Set up ContainerLaunchContext, setting local resource, environment,
+ // command and token for constructor.
+
+ // Note for tokens: Set up tokens for the container too. Today, for normal
+ // shell commands, the container in distribute-shell doesn't need any
+ // tokens. We are populating them mainly for NodeManagers to be able to
+ // download anyfiles in the distributed file-system. The tokens are
+ // otherwise also useful in cases, for e.g., when one is running a
+ // "hadoop dfs" command inside the distributed shell.
+ ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
+ localResources, shellEnv, commands, null, allTokens.duplicate(), null);
+ containerListener.addContainer(container.getId(), container);
+ nmClientAsync.startContainerAsync(container, ctx);
+---+
- GetContainerStatusRequest statusReq =
- Records.newRecord(GetContainerStatusRequest.class);
- statusReq.setContainerId(container.getId());
- GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
- LOG.info("Container Status"
- + ", id=" + container.getId()
- + ", status=" + statusResp.getStatus());
-+---+
+ * The <<<NMClientAsync>>> object, together with its event handler, handles container events. Including container start, stop, status update, and occurs an error.
+
+ * After the ApplicationMaster determines the work is done, it needs to unregister itself through the AM-RM client, and then stops the client.
+
++---+
+ try {
+ amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (YarnException ex) {
+ LOG.error("Failed to unregister application", ex);
+ } catch (IOException e) {
+ LOG.error("Failed to unregister application", e);
+ }
+
+ amRMClient.stop();
++---+
~~** Defining the context in which your code runs
-~~*** Container Resource Requests
+~~*** Container Resource Requests
-~~*** Local Resources
+~~*** Local Resources
-~~*** Environment
+~~*** Environment
-~~**** Managing the CLASSPATH
+~~**** Managing the CLASSPATH
-~~** Security
+~~** Security
-* FAQ
+* FAQ
-** How can I distribute my application's jars to all of the nodes in the YARN
+** How can I distribute my application's jars to all of the nodes in the YARN
cluster that need it?
- You can use the LocalResource to add resources to your application request.
- This will cause YARN to distribute the resource to the ApplicationMaster node.
- If the resource is a tgz, zip, or jar - you can have YARN unzip it. Then, all
- you need to do is add the unzipped folder to your classpath.
- For example, when creating your application request:
-
-+---+
- File packageFile = new File(packagePath);
- Url packageUrl = ConverterUtils.getYarnUrlFromPath(
- FileContext.getFileContext.makeQualified(new Path(packagePath)));
-
- packageResource.setResource(packageUrl);
- packageResource.setSize(packageFile.length());
- packageResource.setTimestamp(packageFile.lastModified());
- packageResource.setType(LocalResourceType.ARCHIVE);
- packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
- resource.setMemory(memory)
- containerCtx.setResource(resource)
- containerCtx.setCommands(ImmutableList.of(
- "java -cp './package/*' some.class.to.Run "
- + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
- + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"))
- containerCtx.setLocalResources(
- Collections.singletonMap("package", packageResource))
- appCtx.setApplicationId(appId)
- appCtx.setUser(user.getShortUserName)
- appCtx.setAMContainerSpec(containerCtx)
- request.setApplicationSubmissionContext(appCtx)
- applicationsManager.submitApplication(request)
-+---+
-
- As you can see, the setLocalResources command takes a map of names to
- resources. The name becomes a sym link in your application's cwd, so you can
- just refer to the artifacts inside by using ./package/*.
-
- Note: Java's classpath (cp) argument is VERY sensitive.
+ * You can use the LocalResource to add resources to your application request.
+ This will cause YARN to distribute the resource to the ApplicationMaster
+ node. If the resource is a tgz, zip, or jar - you can have YARN unzip it.
+ Then, all you need to do is add the unzipped folder to your classpath. For
+ example, when creating your application request:
+
++---+
+ File packageFile = new File(packagePath);
+ Url packageUrl = ConverterUtils.getYarnUrlFromPath(
+ FileContext.getFileContext.makeQualified(new Path(packagePath)));
+
+ packageResource.setResource(packageUrl);
+ packageResource.setSize(packageFile.length());
+ packageResource.setTimestamp(packageFile.lastModified());
+ packageResource.setType(LocalResourceType.ARCHIVE);
+ packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ resource.setMemory(memory);
+ containerCtx.setResource(resource);
+ containerCtx.setCommands(ImmutableList.of(
+ "java -cp './package/*' some.class.to.Run "
+ + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
+ + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
+ containerCtx.setLocalResources(
+ Collections.singletonMap("package", packageResource));
+ appCtx.setApplicationId(appId);
+ appCtx.setUser(user.getShortUserName);
+ appCtx.setAMContainerSpec(containerCtx);
+ yarnClient.submitApplication(appCtx);
++---+
+
+ As you can see, the <<<setLocalResources>>> command takes a map of names to
+ resources. The name becomes a sym link in your application's cwd, so you can
+ just refer to the artifacts inside by using ./package/*.
+
+ Note: Java's classpath (cp) argument is VERY sensitive.
Make sure you get the syntax EXACTLY correct.
- Once your package is distributed to your ApplicationMaster, you'll need to
- follow the same process whenever your ApplicationMaster starts a new container
- (assuming you want the resources to be sent to your container). The code for
- this is the same. You just need to make sure that you give your
- ApplicationMaster the package path (either HDFS, or local), so that it can
- send the resource URL along with the container ctx.
-
-** How do I get the ApplicationMaster's ApplicationAttemptId?
-
-
- The ApplicationAttemptId will be passed to the ApplicationMaster via the
- environment and the value from the environment can be converted into an
- ApplicationAttemptId object via the ConverterUtils helper function.
-
-** My container is being killed by the Node Manager
-
- This is likely due to high memory usage exceeding your requested container
- memory size. There are a number of reasons that can cause this. First, look
- at the process tree that the node manager dumps when it kills your container.
- The two things you're interested in are physical memory and virtual memory.
- If you have exceeded physical memory limits your app is using too much physical
- memory. If you're running a Java app, you can use -hprof to look at what is
- taking up space in the heap. If you have exceeded virtual memory, you may
- need to increase the value of the the cluster-wide configuration variable
- <<<yarn.nodemanager.vmem-pmem-ratio>>>.
+ Once your package is distributed to your AM, you'll need to follow the same
+ process whenever your AM starts a new container (assuming you want the
+ resources to be sent to your container). The code for this is the same. You
+ just need to make sure that you give your AM the package path (either HDFS, or
+ local), so that it can send the resource URL along with the container ctx.
+
+** How do I get the ApplicationMaster's <<<ApplicationAttemptId>>>?
+
+ * The <<<ApplicationAttemptId>>> will be passed to the AM via the environment
+ and the value from the environment can be converted into an
+ <<<ApplicationAttemptId>>> object via the ConverterUtils helper function.
+
+** Why my container is killed by the NodeManager?
+
+ * This is likely due to high memory usage exceeding your requested container
+ memory size. There are a number of reasons that can cause this. First, look
+ at the process tree that the NodeManager dumps when it kills your container.
+ The two things you're interested in are physical memory and virtual memory.
+ If you have exceeded physical memory limits your app is using too much
+ physical memory. If you're running a Java app, you can use -hprof to look at
+ what is taking up space in the heap. If you have exceeded virtual memory, you
+ may need to increase the value of the the cluster-wide configuration variable
+ <<<yarn.nodemanager.vmem-pmem-ratio>>>.
** How do I include native libraries?
-
- Setting -Djava.library.path on the command line while launching a container
- can cause native libraries used by Hadoop to not be loaded correctly and can
- result in errors. It is cleaner to use LD_LIBRARY_PATH instead.
+ * Setting <<<-Djava.library.path>>> on the command line while launching a
+ container can cause native libraries used by Hadoop to not be loaded
+ correctly and can result in errors. It is cleaner to use
+ <<<LD_LIBRARY_PATH>>> instead.
* Useful Links
- * {{{https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf}Map Reduce Next Generation Architecture}}
+ * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html}YARN Architecture}}
+
+ * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html}YARN Capacity Scheduler}}
+
+ * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html}YARN Fair Scheduler}}
+
+* Sample code
- * {{{http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/}Map Reduce Next Generation Scheduler}}
+ * Yarn distributed shell: in <<<hadoop-yarn-applications-distributedshell>>>
+ project after you set up your development environment.
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/YarnCommands.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/YarnCommands.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/YarnCommands.apt.vm (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/YarnCommands.apt.vm Tue Aug 19 23:49:39 2014
@@ -18,8 +18,6 @@
Yarn Commands
- \[ {{{./index.html}Go Back}} \]
-
%{toc|section=1|fromDepth=0}
* Overview
@@ -159,9 +157,18 @@ Usage: yarn [--config confdir] COMMAND
Start the ResourceManager
-------
- Usage: yarn resourcemanager
+ Usage: yarn resourcemanager [-format-state-store]
-------
+*---------------+--------------+
+|| COMMAND_OPTIONS || Description |
+*---------------+--------------+
+| -format-state-store | Formats the RMStateStore. This will clear the
+| | RMStateStore and is useful if past applications are no
+| | longer needed. This should be run only when the
+| | ResourceManager is not running.
+*---------------+--------------+
+
** nodemanager
Start the NodeManager