You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2012/07/24 19:33:16 UTC

svn commit: r1365185 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-applications/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distribut...

Author: acmurthy
Date: Tue Jul 24 17:33:15 2012
New Revision: 1365185

URL: http://svn.apache.org/viewvc?rev=1365185&view=rev
Log:
MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN. Contributed by Bikas Saha.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1365185&r1=1365184&r2=1365185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jul 24 17:33:15 2012
@@ -161,6 +161,9 @@ Release 2.1.0-alpha - Unreleased 
 
     MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu)
 
+    MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN.
+    (Bikas Saha via acmurthy) 
+
   IMPROVEMENTS
 
     MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1365185&r1=1365184&r2=1365185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Jul 24 17:33:15 2012
@@ -290,7 +290,10 @@ public class ApplicationMaster {
     Map<String, String> envs = System.getenv();
 
     appAttemptID = Records.newRecord(ApplicationAttemptId.class);
-    if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+    if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
+      appAttemptID = ConverterUtils.toApplicationAttemptId(envs
+          .get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
+    } else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
       if (cliParser.hasOption("app_attempt_id")) {
         String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
         appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml?rev=1365185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml Tue Jul 24 17:33:15 2012
@@ -0,0 +1,110 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>hadoop-yarn-applications</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>3.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-yarn-applications-unmanaged-am-launcher</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+  <name>hadoop-yarn-applications-unmanaged-am-launcher</name>
+
+  <properties>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-applications-distributedshell</artifactId>
+      <version>3.0.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>build-classpath</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>build-classpath</goal>
+            </goals>
+            <configuration>
+              <!-- needed to run the unit test for DS to generate the required classpath 
+                   that is required in the env of the launch container in the mini yarn cluster -->
+              <outputFile>target/classes/yarn-apps-am-generated-classpath</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <environmentVariables>
+            <JAVA_HOME>${java.home}</JAVA_HOME>
+          </environmentVariables>
+       </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java?rev=1365185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java Tue Jul 24 17:33:15 2012
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The UnmanagedLauncher is a simple client that launches and unmanaged AM. An
+ * unmanagedAM is an AM that is not launched and managed by the RM. The client
+ * creates a new application on the RM and negotiates a new attempt id. Then it
+ * waits for the RM app state to reach be YarnApplicationState.ACCEPTED after
+ * which it spawns the AM in another process and passes it the attempt id via
+ * env variable ApplicationConstants.AM_APP_ATTEMPT_ID_ENV. The AM can be in any
+ * language. The AM can register with the RM using the attempt id and proceed as
+ * normal. The client redirects app stdout and stderr to its own stdout and
+ * stderr and waits for the AM process to exit. Then it waits for the RM to
+ * report app completion.
+ */
+public class UnmanagedAMLauncher {
+  private static final Log LOG = LogFactory.getLog(UnmanagedAMLauncher.class);
+
+  private Configuration conf;
+
+  // RPC to communicate to RM
+  private YarnRPC rpc;
+
+  // Handle to talk to the Resource Manager/Applications Manager
+  private ClientRMProtocol rmClient;
+
+  // Application master specific info to register a new Application with RM/ASM
+  private String appName = "";
+  // App master priority
+  private int amPriority = 0;
+  // Queue for App master
+  private String amQueue = "";
+  // cmd to start AM
+  private String amCmd = null;
+  // set the classpath explicitly
+  private String classpath = null;
+
+  /**
+   * @param args
+   *          Command line arguments
+   */
+  public static void main(String[] args) {
+    try {
+      UnmanagedAMLauncher client = new UnmanagedAMLauncher();
+      LOG.info("Initializing Client");
+      boolean doRun = client.init(args);
+      if (!doRun) {
+        System.exit(0);
+      }
+      client.run();
+    } catch (Throwable t) {
+      LOG.fatal("Error running Client", t);
+      System.exit(1);
+    }
+  }
+
+  /**
+   */
+  public UnmanagedAMLauncher(Configuration conf) throws Exception {
+    // Set up RPC
+    this.conf = conf;
+    rpc = YarnRPC.create(conf);
+  }
+
+  public UnmanagedAMLauncher() throws Exception {
+    this(new Configuration());
+  }
+
+  private void printUsage(Options opts) {
+    new HelpFormatter().printHelp("Client", opts);
+  }
+
+  public boolean init(String[] args) throws ParseException {
+
+    Options opts = new Options();
+    opts.addOption("appname", true,
+        "Application Name. Default value - UnmanagedAM");
+    opts.addOption("priority", true, "Application Priority. Default 0");
+    opts.addOption("queue", true,
+        "RM Queue in which this application is to be submitted");
+    opts.addOption("master_memory", true,
+        "Amount of memory in MB to be requested to run the application master");
+    opts.addOption("cmd", true, "command to start unmanaged AM (required)");
+    opts.addOption("classpath", true, "additional classpath");
+    opts.addOption("help", false, "Print usage");
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+
+    if (args.length == 0) {
+      printUsage(opts);
+      throw new IllegalArgumentException(
+          "No args specified for client to initialize");
+    }
+
+    if (cliParser.hasOption("help")) {
+      printUsage(opts);
+      return false;
+    }
+
+    appName = cliParser.getOptionValue("appname", "UnmanagedAM");
+    amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+    amQueue = cliParser.getOptionValue("queue", "");
+    classpath = cliParser.getOptionValue("classpath", null);
+
+    amCmd = cliParser.getOptionValue("cmd");
+    if (amCmd == null) {
+      printUsage(opts);
+      throw new IllegalArgumentException(
+          "No cmd specified for application master");
+    }
+
+    return true;
+  }
+
+  private void connectToRM() throws IOException {
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    InetSocketAddress rmAddress = yarnConf.getSocketAddr(
+        YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_PORT);
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
+        rmAddress, conf));
+  }
+
+  private GetNewApplicationResponse getApplication() throws YarnRemoteException {
+    GetNewApplicationRequest request = Records
+        .newRecord(GetNewApplicationRequest.class);
+    GetNewApplicationResponse response = rmClient.getNewApplication(request);
+    LOG.info("Got new application id=" + response.getApplicationId());
+    return response;
+  }
+
+  public void launchAM(ApplicationAttemptId attemptId) throws IOException {
+    Map<String, String> env = System.getenv();
+    ArrayList<String> envAMList = new ArrayList<String>();
+    boolean setClasspath = false;
+    for (Map.Entry<String, String> entry : env.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if(key.equals("CLASSPATH")) {
+        setClasspath = true;
+        if(classpath != null) {
+          value = value + File.pathSeparator + classpath;
+        }
+      }
+      envAMList.add(key + "=" + value);
+    }
+    
+    if(!setClasspath && classpath!=null) {
+      envAMList.add("CLASSPATH="+classpath);
+    }
+        
+    envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
+
+    String[] envAM = new String[envAMList.size()];
+    Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
+
+    final BufferedReader errReader = 
+        new BufferedReader(new InputStreamReader(amProc
+                                                 .getErrorStream()));
+    final BufferedReader inReader = 
+        new BufferedReader(new InputStreamReader(amProc
+                                                 .getInputStream()));
+    
+    // read error and input streams as this would free up the buffers
+    // free the error stream buffer
+    Thread errThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          String line = errReader.readLine();
+          while((line != null) && !isInterrupted()) {
+            System.err.println(line);
+            line = errReader.readLine();
+          }
+        } catch(IOException ioe) {
+          LOG.warn("Error reading the error stream", ioe);
+        }
+      }
+    };
+    Thread outThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          String line = inReader.readLine();
+          while((line != null) && !isInterrupted()) {
+            System.out.println(line);
+            line = inReader.readLine();
+          }
+        } catch(IOException ioe) {
+          LOG.warn("Error reading the out stream", ioe);
+        }
+      }
+    };
+    try {
+      errThread.start();
+      outThread.start();
+    } catch (IllegalStateException ise) { }
+
+    // wait for the process to finish and check the exit code
+    try {
+      int exitCode = amProc.waitFor();
+      LOG.info("AM process exited with value: " + exitCode);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    try {
+      // make sure that the error thread exits
+      // on Windows these threads sometimes get stuck and hang the execution
+      // timeout and join later after destroying the process.
+      errThread.join();
+      outThread.join();
+      errReader.close();
+      inReader.close();
+    } catch (InterruptedException ie) {
+      LOG.info("ShellExecutor: Interrupted while reading the error/out stream",
+          ie);
+    } catch (IOException ioe) {
+      LOG.warn("Error while closing the error/out stream", ioe);
+    }
+    amProc.destroy();
+  }
+
+  public boolean run() throws IOException {
+    LOG.info("Starting Client");
+
+    // Connect to ResourceManager
+    connectToRM();
+    assert (rmClient != null);
+
+    // Get a new application id
+    GetNewApplicationResponse newApp = getApplication();
+    ApplicationId appId = newApp.getApplicationId();
+
+    // Create launch context for app master
+    LOG.info("Setting up application submission context for ASM");
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName(appName);
+
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(amPriority);
+    appContext.setPriority(pri);
+
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue(amQueue);
+
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records
+        .newRecord(ContainerLaunchContext.class);
+    appContext.setAMContainerSpec(amContainer);
+
+    // unmanaged AM
+    appContext.setUnmanagedAM(true);
+    LOG.info("Setting unmanaged AM");
+
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+
+    // Submit the application to the applications manager
+    LOG.info("Submitting application to ASM");
+    rmClient.submitApplication(appRequest);
+
+    // Monitor the application to wait for launch state
+    ApplicationReport appReport = monitorApplication(appId,
+        EnumSet.of(YarnApplicationState.ACCEPTED));
+    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+    LOG.info("Launching application with id: " + attemptId);
+
+    // launch AM
+    launchAM(attemptId);
+
+    // Monitor the application for end state
+    appReport = monitorApplication(appId, EnumSet.of(
+        YarnApplicationState.KILLED, YarnApplicationState.FAILED,
+        YarnApplicationState.FINISHED));
+    YarnApplicationState appState = appReport.getYarnApplicationState();
+    FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
+
+    LOG.info("App ended with state: " + appReport.getYarnApplicationState()
+        + " and status: " + appStatus);
+    if (YarnApplicationState.FINISHED == appState
+        && FinalApplicationStatus.SUCCEEDED == appStatus) {
+      LOG.info("Application has completed successfully.");
+      return true;
+    } else {
+      LOG.info("Application did finished unsuccessfully." + " YarnState="
+          + appState.toString() + ", FinalStatus=" + appStatus.toString());
+      return false;
+    }
+  }
+
+  /**
+   * Monitor the submitted application for completion. Kill application if time
+   * expires.
+   * 
+   * @param appId
+   *          Application Id of application to be monitored
+   * @return true if application completed successfully
+   * @throws YarnRemoteException
+   */
+  private ApplicationReport monitorApplication(ApplicationId appId,
+      Set<YarnApplicationState> finalState) throws YarnRemoteException {
+
+    while (true) {
+
+      // Check app status every 1 second.
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.debug("Thread sleep in monitoring loop interrupted");
+      }
+
+      // Get application report for the appId we are interested in
+      GetApplicationReportRequest reportRequest = Records
+          .newRecord(GetApplicationReportRequest.class);
+      reportRequest.setApplicationId(appId);
+      GetApplicationReportResponse reportResponse = rmClient
+          .getApplicationReport(reportRequest);
+      ApplicationReport report = reportResponse.getApplicationReport();
+
+      LOG.info("Got application report from ASM for" + ", appId="
+          + appId.getId() + ", appAttemptId="
+          + report.getCurrentApplicationAttemptId() + ", clientToken="
+          + report.getClientToken() + ", appDiagnostics="
+          + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
+          + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+          + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
+          + ", yarnAppState=" + report.getYarnApplicationState().toString()
+          + ", distributedFinalState="
+          + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
+          + report.getTrackingUrl() + ", appUser=" + report.getUser());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      if (finalState.contains(state)) {
+        return report;
+      }
+
+    }
+
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1365185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java Tue Jul 24 17:33:15 2012
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.URL;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestUnmanagedAMLauncher {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestUnmanagedAMLauncher.class);
+
+  protected static MiniYARNCluster yarnCluster = null;
+  protected static Configuration conf = new Configuration();
+
+  @BeforeClass
+  public static void setup() throws InterruptedException, IOException {
+    LOG.info("Starting up YARN cluster");
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
+    if (yarnCluster == null) {
+      yarnCluster = new MiniYARNCluster(
+          TestUnmanagedAMLauncher.class.getName(), 1, 1, 1);
+      yarnCluster.init(conf);
+      yarnCluster.start();
+      URL url = Thread.currentThread().getContextClassLoader()
+          .getResource("yarn-site.xml");
+      if (url == null) {
+        throw new RuntimeException(
+            "Could not find 'yarn-site.xml' dummy file in classpath");
+      }
+      OutputStream os = new FileOutputStream(new File(url.getPath()));
+      yarnCluster.getConfig().writeXml(os);
+      os.close();
+    }
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (yarnCluster != null) {
+      yarnCluster.stop();
+      yarnCluster = null;
+    }
+  }
+
+  private static String getTestRuntimeClasspath() {
+
+    InputStream classpathFileStream = null;
+    BufferedReader reader = null;
+    String envClassPath = "";
+
+    LOG.info("Trying to generate classpath for app master from current thread's classpath");
+    try {
+
+      // Create classpath from generated classpath
+      // Check maven pom.xml for generated classpath info
+      // Works if compile time env is same as runtime. Mainly tests.
+      ClassLoader thisClassLoader = Thread.currentThread()
+          .getContextClassLoader();
+      String generatedClasspathFile = "yarn-apps-am-generated-classpath";
+      classpathFileStream = thisClassLoader
+          .getResourceAsStream(generatedClasspathFile);
+      if (classpathFileStream == null) {
+        LOG.info("Could not classpath resource from class loader");
+        return envClassPath;
+      }
+      LOG.info("Readable bytes from stream=" + classpathFileStream.available());
+      reader = new BufferedReader(new InputStreamReader(classpathFileStream));
+      String cp = reader.readLine();
+      if (cp != null) {
+        envClassPath += cp.trim() + File.pathSeparator;
+      }
+      // yarn-site.xml at this location contains proper config for mini cluster
+      URL url = thisClassLoader.getResource("yarn-site.xml");
+      envClassPath += new File(url.getFile()).getParent();
+    } catch (IOException e) {
+      LOG.info("Could not find the necessary resource to generate class path for tests. Error="
+          + e.getMessage());
+    }
+
+    try {
+      if (classpathFileStream != null) {
+        classpathFileStream.close();
+      }
+      if (reader != null) {
+        reader.close();
+      }
+    } catch (IOException e) {
+      LOG.info("Failed to close class path file stream or reader. Error="
+          + e.getMessage());
+    }
+    return envClassPath;
+  }
+
+  @Test
+  public void testDSShell() throws Exception {
+    String classpath = getTestRuntimeClasspath();
+    String javaHome = System.getenv("JAVA_HOME");
+    if (javaHome == null) {
+      LOG.fatal("JAVA_HOME not defined. Test not running.");
+      return;
+    }
+    // start dist-shell with 0 containers because container launch will fail if 
+    // there are no dist cache resources.
+    String[] args = {
+        "--classpath",
+        classpath,
+        "--cmd",
+        javaHome
+            + "/bin/java -Xmx512m "
+            + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
+            + "--container_memory 128 --num_containers 0 --priority 0 --shell_command ls" };
+
+    LOG.info("Initializing Launcher");
+    UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
+        yarnCluster.getConfig()));
+    boolean initSuccess = launcher.init(args);
+    Assert.assertTrue(initSuccess);
+    LOG.info("Running Launcher");
+    boolean result = launcher.run();
+
+    LOG.info("Launcher run completed. Result=" + result);
+    Assert.assertTrue(result);
+
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml?rev=1365185&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml Tue Jul 24 17:33:15 2012
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<configuration>
+  <!-- Dummy (invalid) config file to be overwriten by TestUnmanagedAMLauncher with MiniCluster configuration. -->
+</configuration>
+
+

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml?rev=1365185&r1=1365184&r2=1365185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml Tue Jul 24 17:33:15 2012
@@ -30,6 +30,7 @@
 
   <modules>
     <module>hadoop-yarn-applications-distributedshell</module>
+    <module>hadoop-yarn-applications-unmanaged-am-launcher</module>
   </modules>
  <profiles>
   <profile>