You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2012/07/24 19:33:16 UTC
svn commit: r1365185 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-applications/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distribut...
Author: acmurthy
Date: Tue Jul 24 17:33:15 2012
New Revision: 1365185
URL: http://svn.apache.org/viewvc?rev=1365185&view=rev
Log:
MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN. Contributed by Bikas Saha.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1365185&r1=1365184&r2=1365185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jul 24 17:33:15 2012
@@ -161,6 +161,9 @@ Release 2.1.0-alpha - Unreleased
MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu)
+ MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN.
+ (Bikas Saha via acmurthy)
+
IMPROVEMENTS
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1365185&r1=1365184&r2=1365185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Jul 24 17:33:15 2012
@@ -290,7 +290,10 @@ public class ApplicationMaster {
Map<String, String> envs = System.getenv();
appAttemptID = Records.newRecord(ApplicationAttemptId.class);
- if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+ if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
+ appAttemptID = ConverterUtils.toApplicationAttemptId(envs
+ .get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
+ } else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml?rev=1365185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml Tue Jul 24 17:33:15 2012
@@ -0,0 +1,110 @@
+<?xml version="1.0"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>hadoop-yarn-applications</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>3.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-applications-unmanaged-am-launcher</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <name>hadoop-yarn-applications-unmanaged-am-launcher</name>
+
+ <properties>
+ <!-- Needed for generating FindBugs warnings using parent pom -->
+ <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-applications-distributedshell</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>build-classpath</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>build-classpath</goal>
+ </goals>
+ <configuration>
+ <!-- needed to run the unit test for DS to generate the required classpath
+ that is required in the env of the launch container in the mini yarn cluster -->
+ <outputFile>target/classes/yarn-apps-am-generated-classpath</outputFile>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <environmentVariables>
+ <JAVA_HOME>${java.home}</JAVA_HOME>
+ </environmentVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java?rev=1365185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java Tue Jul 24 17:33:15 2012
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The UnmanagedLauncher is a simple client that launches and unmanaged AM. An
+ * unmanagedAM is an AM that is not launched and managed by the RM. The client
+ * creates a new application on the RM and negotiates a new attempt id. Then it
+ * waits for the RM app state to reach be YarnApplicationState.ACCEPTED after
+ * which it spawns the AM in another process and passes it the attempt id via
+ * env variable ApplicationConstants.AM_APP_ATTEMPT_ID_ENV. The AM can be in any
+ * language. The AM can register with the RM using the attempt id and proceed as
+ * normal. The client redirects app stdout and stderr to its own stdout and
+ * stderr and waits for the AM process to exit. Then it waits for the RM to
+ * report app completion.
+ */
+public class UnmanagedAMLauncher {
+ private static final Log LOG = LogFactory.getLog(UnmanagedAMLauncher.class);
+
+ private Configuration conf;
+
+ // RPC to communicate to RM
+ private YarnRPC rpc;
+
+ // Handle to talk to the Resource Manager/Applications Manager
+ private ClientRMProtocol rmClient;
+
+ // Application master specific info to register a new Application with RM/ASM
+ private String appName = "";
+ // App master priority
+ private int amPriority = 0;
+ // Queue for App master
+ private String amQueue = "";
+ // cmd to start AM
+ private String amCmd = null;
+ // set the classpath explicitly
+ private String classpath = null;
+
+ /**
+ * @param args
+ * Command line arguments
+ */
+ public static void main(String[] args) {
+ try {
+ UnmanagedAMLauncher client = new UnmanagedAMLauncher();
+ LOG.info("Initializing Client");
+ boolean doRun = client.init(args);
+ if (!doRun) {
+ System.exit(0);
+ }
+ client.run();
+ } catch (Throwable t) {
+ LOG.fatal("Error running Client", t);
+ System.exit(1);
+ }
+ }
+
+ /**
+ */
+ public UnmanagedAMLauncher(Configuration conf) throws Exception {
+ // Set up RPC
+ this.conf = conf;
+ rpc = YarnRPC.create(conf);
+ }
+
+ public UnmanagedAMLauncher() throws Exception {
+ this(new Configuration());
+ }
+
+ private void printUsage(Options opts) {
+ new HelpFormatter().printHelp("Client", opts);
+ }
+
+ public boolean init(String[] args) throws ParseException {
+
+ Options opts = new Options();
+ opts.addOption("appname", true,
+ "Application Name. Default value - UnmanagedAM");
+ opts.addOption("priority", true, "Application Priority. Default 0");
+ opts.addOption("queue", true,
+ "RM Queue in which this application is to be submitted");
+ opts.addOption("master_memory", true,
+ "Amount of memory in MB to be requested to run the application master");
+ opts.addOption("cmd", true, "command to start unmanaged AM (required)");
+ opts.addOption("classpath", true, "additional classpath");
+ opts.addOption("help", false, "Print usage");
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (args.length == 0) {
+ printUsage(opts);
+ throw new IllegalArgumentException(
+ "No args specified for client to initialize");
+ }
+
+ if (cliParser.hasOption("help")) {
+ printUsage(opts);
+ return false;
+ }
+
+ appName = cliParser.getOptionValue("appname", "UnmanagedAM");
+ amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+ amQueue = cliParser.getOptionValue("queue", "");
+ classpath = cliParser.getOptionValue("classpath", null);
+
+ amCmd = cliParser.getOptionValue("cmd");
+ if (amCmd == null) {
+ printUsage(opts);
+ throw new IllegalArgumentException(
+ "No cmd specified for application master");
+ }
+
+ return true;
+ }
+
+ private void connectToRM() throws IOException {
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ InetSocketAddress rmAddress = yarnConf.getSocketAddr(
+ YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
+ rmAddress, conf));
+ }
+
+ private GetNewApplicationResponse getApplication() throws YarnRemoteException {
+ GetNewApplicationRequest request = Records
+ .newRecord(GetNewApplicationRequest.class);
+ GetNewApplicationResponse response = rmClient.getNewApplication(request);
+ LOG.info("Got new application id=" + response.getApplicationId());
+ return response;
+ }
+
+ public void launchAM(ApplicationAttemptId attemptId) throws IOException {
+ Map<String, String> env = System.getenv();
+ ArrayList<String> envAMList = new ArrayList<String>();
+ boolean setClasspath = false;
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if(key.equals("CLASSPATH")) {
+ setClasspath = true;
+ if(classpath != null) {
+ value = value + File.pathSeparator + classpath;
+ }
+ }
+ envAMList.add(key + "=" + value);
+ }
+
+ if(!setClasspath && classpath!=null) {
+ envAMList.add("CLASSPATH="+classpath);
+ }
+
+ envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
+
+ String[] envAM = new String[envAMList.size()];
+ Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
+
+ final BufferedReader errReader =
+ new BufferedReader(new InputStreamReader(amProc
+ .getErrorStream()));
+ final BufferedReader inReader =
+ new BufferedReader(new InputStreamReader(amProc
+ .getInputStream()));
+
+ // read error and input streams as this would free up the buffers
+ // free the error stream buffer
+ Thread errThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ String line = errReader.readLine();
+ while((line != null) && !isInterrupted()) {
+ System.err.println(line);
+ line = errReader.readLine();
+ }
+ } catch(IOException ioe) {
+ LOG.warn("Error reading the error stream", ioe);
+ }
+ }
+ };
+ Thread outThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ String line = inReader.readLine();
+ while((line != null) && !isInterrupted()) {
+ System.out.println(line);
+ line = inReader.readLine();
+ }
+ } catch(IOException ioe) {
+ LOG.warn("Error reading the out stream", ioe);
+ }
+ }
+ };
+ try {
+ errThread.start();
+ outThread.start();
+ } catch (IllegalStateException ise) { }
+
+ // wait for the process to finish and check the exit code
+ try {
+ int exitCode = amProc.waitFor();
+ LOG.info("AM process exited with value: " + exitCode);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ // make sure that the error thread exits
+ // on Windows these threads sometimes get stuck and hang the execution
+ // timeout and join later after destroying the process.
+ errThread.join();
+ outThread.join();
+ errReader.close();
+ inReader.close();
+ } catch (InterruptedException ie) {
+ LOG.info("ShellExecutor: Interrupted while reading the error/out stream",
+ ie);
+ } catch (IOException ioe) {
+ LOG.warn("Error while closing the error/out stream", ioe);
+ }
+ amProc.destroy();
+ }
+
+ public boolean run() throws IOException {
+ LOG.info("Starting Client");
+
+ // Connect to ResourceManager
+ connectToRM();
+ assert (rmClient != null);
+
+ // Get a new application id
+ GetNewApplicationResponse newApp = getApplication();
+ ApplicationId appId = newApp.getApplicationId();
+
+ // Create launch context for app master
+ LOG.info("Setting up application submission context for ASM");
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName(appName);
+
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(amPriority);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(amQueue);
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+
+ // unmanaged AM
+ appContext.setUnmanagedAM(true);
+ LOG.info("Setting unmanaged AM");
+
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest = Records
+ .newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+
+ // Submit the application to the applications manager
+ LOG.info("Submitting application to ASM");
+ rmClient.submitApplication(appRequest);
+
+ // Monitor the application to wait for launch state
+ ApplicationReport appReport = monitorApplication(appId,
+ EnumSet.of(YarnApplicationState.ACCEPTED));
+ ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+ LOG.info("Launching application with id: " + attemptId);
+
+ // launch AM
+ launchAM(attemptId);
+
+ // Monitor the application for end state
+ appReport = monitorApplication(appId, EnumSet.of(
+ YarnApplicationState.KILLED, YarnApplicationState.FAILED,
+ YarnApplicationState.FINISHED));
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
+
+ LOG.info("App ended with state: " + appReport.getYarnApplicationState()
+ + " and status: " + appStatus);
+ if (YarnApplicationState.FINISHED == appState
+ && FinalApplicationStatus.SUCCEEDED == appStatus) {
+ LOG.info("Application has completed successfully.");
+ return true;
+ } else {
+ LOG.info("Application did finished unsuccessfully." + " YarnState="
+ + appState.toString() + ", FinalStatus=" + appStatus.toString());
+ return false;
+ }
+ }
+
+ /**
+ * Monitor the submitted application for completion. Kill application if time
+ * expires.
+ *
+ * @param appId
+ * Application Id of application to be monitored
+ * @return true if application completed successfully
+ * @throws YarnRemoteException
+ */
+ private ApplicationReport monitorApplication(ApplicationId appId,
+ Set<YarnApplicationState> finalState) throws YarnRemoteException {
+
+ while (true) {
+
+ // Check app status every 1 second.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+
+ // Get application report for the appId we are interested in
+ GetApplicationReportRequest reportRequest = Records
+ .newRecord(GetApplicationReportRequest.class);
+ reportRequest.setApplicationId(appId);
+ GetApplicationReportResponse reportResponse = rmClient
+ .getApplicationReport(reportRequest);
+ ApplicationReport report = reportResponse.getApplicationReport();
+
+ LOG.info("Got application report from ASM for" + ", appId="
+ + appId.getId() + ", appAttemptId="
+ + report.getCurrentApplicationAttemptId() + ", clientToken="
+ + report.getClientToken() + ", appDiagnostics="
+ + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
+ + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+ + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
+ + ", yarnAppState=" + report.getYarnApplicationState().toString()
+ + ", distributedFinalState="
+ + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
+ + report.getTrackingUrl() + ", appUser=" + report.getUser());
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ if (finalState.contains(state)) {
+ return report;
+ }
+
+ }
+
+ }
+
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1365185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java Tue Jul 24 17:33:15 2012
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.URL;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestUnmanagedAMLauncher {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestUnmanagedAMLauncher.class);
+
+ protected static MiniYARNCluster yarnCluster = null;
+ protected static Configuration conf = new Configuration();
+
+ @BeforeClass
+ public static void setup() throws InterruptedException, IOException {
+ LOG.info("Starting up YARN cluster");
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
+ if (yarnCluster == null) {
+ yarnCluster = new MiniYARNCluster(
+ TestUnmanagedAMLauncher.class.getName(), 1, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ URL url = Thread.currentThread().getContextClassLoader()
+ .getResource("yarn-site.xml");
+ if (url == null) {
+ throw new RuntimeException(
+ "Could not find 'yarn-site.xml' dummy file in classpath");
+ }
+ OutputStream os = new FileOutputStream(new File(url.getPath()));
+ yarnCluster.getConfig().writeXml(os);
+ os.close();
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (yarnCluster != null) {
+ yarnCluster.stop();
+ yarnCluster = null;
+ }
+ }
+
+ private static String getTestRuntimeClasspath() {
+
+ InputStream classpathFileStream = null;
+ BufferedReader reader = null;
+ String envClassPath = "";
+
+ LOG.info("Trying to generate classpath for app master from current thread's classpath");
+ try {
+
+ // Create classpath from generated classpath
+ // Check maven pom.xml for generated classpath info
+ // Works if compile time env is same as runtime. Mainly tests.
+ ClassLoader thisClassLoader = Thread.currentThread()
+ .getContextClassLoader();
+ String generatedClasspathFile = "yarn-apps-am-generated-classpath";
+ classpathFileStream = thisClassLoader
+ .getResourceAsStream(generatedClasspathFile);
+ if (classpathFileStream == null) {
+ LOG.info("Could not classpath resource from class loader");
+ return envClassPath;
+ }
+ LOG.info("Readable bytes from stream=" + classpathFileStream.available());
+ reader = new BufferedReader(new InputStreamReader(classpathFileStream));
+ String cp = reader.readLine();
+ if (cp != null) {
+ envClassPath += cp.trim() + File.pathSeparator;
+ }
+ // yarn-site.xml at this location contains proper config for mini cluster
+ URL url = thisClassLoader.getResource("yarn-site.xml");
+ envClassPath += new File(url.getFile()).getParent();
+ } catch (IOException e) {
+ LOG.info("Could not find the necessary resource to generate class path for tests. Error="
+ + e.getMessage());
+ }
+
+ try {
+ if (classpathFileStream != null) {
+ classpathFileStream.close();
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ LOG.info("Failed to close class path file stream or reader. Error="
+ + e.getMessage());
+ }
+ return envClassPath;
+ }
+
+ @Test
+ public void testDSShell() throws Exception {
+ String classpath = getTestRuntimeClasspath();
+ String javaHome = System.getenv("JAVA_HOME");
+ if (javaHome == null) {
+ LOG.fatal("JAVA_HOME not defined. Test not running.");
+ return;
+ }
+ // start dist-shell with 0 containers because container launch will fail if
+ // there are no dist cache resources.
+ String[] args = {
+ "--classpath",
+ classpath,
+ "--cmd",
+ javaHome
+ + "/bin/java -Xmx512m "
+ + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
+ + "--container_memory 128 --num_containers 0 --priority 0 --shell_command ls" };
+
+ LOG.info("Initializing Launcher");
+ UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
+ yarnCluster.getConfig()));
+ boolean initSuccess = launcher.init(args);
+ Assert.assertTrue(initSuccess);
+ LOG.info("Running Launcher");
+ boolean result = launcher.run();
+
+ LOG.info("Launcher run completed. Result=" + result);
+ Assert.assertTrue(result);
+
+ }
+
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml?rev=1365185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml Tue Jul 24 17:33:15 2012
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<configuration>
+ <!-- Dummy (invalid) config file to be overwriten by TestUnmanagedAMLauncher with MiniCluster configuration. -->
+</configuration>
+
+
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml?rev=1365185&r1=1365184&r2=1365185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml Tue Jul 24 17:33:15 2012
@@ -30,6 +30,7 @@
<modules>
<module>hadoop-yarn-applications-distributedshell</module>
+ <module>hadoop-yarn-applications-unmanaged-am-launcher</module>
</modules>
<profiles>
<profile>