You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by to...@apache.org on 2013/01/23 19:51:28 UTC

svn commit: r1437623 [2/3] - in /hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/...

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Wed Jan 23 18:51:24 2013
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli.CommandLine;
@@ -51,9 +50,6 @@ import org.apache.hadoop.yarn.api.Contai
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 
@@ -71,6 +67,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.AMRMClient;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.AMRMClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -78,37 +77,64 @@ import org.apache.hadoop.yarn.util.Conve
 import org.apache.hadoop.yarn.util.Records;
 
 /**
- * An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework. 
+ * An ApplicationMaster for executing shell commands on a set of launched
+ * containers using the YARN framework.
  * 
- * <p>This class is meant to act as an example on how to write yarn-based application masters. </p>
+ * <p>
+ * This class is meant to act as an example on how to write yarn-based
+ * application masters.
+ * </p>
  * 
- * <p> The ApplicationMaster is started on a container by the <code>ResourceManager</code>'s launcher. 
- * The first thing that the <code>ApplicationMaster</code> needs to do is to connect and register itself with 
- * the <code>ResourceManager</code>. The registration sets up information within the <code>ResourceManager</code>
- * regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client
- * as well as a tracking url that a client can use to keep track of status/job history if needed. </p>
+ * <p>
+ * The ApplicationMaster is started on a container by the
+ * <code>ResourceManager</code>'s launcher. The first thing that the
+ * <code>ApplicationMaster</code> needs to do is to connect and register itself
+ * with the <code>ResourceManager</code>. The registration sets up information
+ * within the <code>ResourceManager</code> regarding what host:port the
+ * ApplicationMaster is listening on to provide any form of functionality to a
+ * client as well as a tracking url that a client can use to keep track of
+ * status/job history if needed.
+ * </p>
  * 
- * <p> The <code>ApplicationMaster</code> needs to send a heartbeat to the <code>ResourceManager</code> at regular intervals
- * to inform the <code>ResourceManager</code> that it is up and alive. The {@link AMRMProtocol#allocate} to the 
- * <code>ResourceManager</code> from the <code>ApplicationMaster</code> acts as a heartbeat.
+ * <p>
+ * The <code>ApplicationMaster</code> needs to send a heartbeat to the
+ * <code>ResourceManager</code> at regular intervals to inform the
+ * <code>ResourceManager</code> that it is up and alive. The
+ * {@link AMRMProtocol#allocate} to the <code>ResourceManager</code> from the
+ * <code>ApplicationMaster</code> acts as a heartbeat.
  * 
- * <p> For the actual handling of the job, the <code>ApplicationMaster</code> has to request the 
- * <code>ResourceManager</code> via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest}
- * with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements.
- * The <code>ResourceManager</code> responds with an {@link AllocateResponse} that informs the <code>ApplicationMaster</code> 
- * of the set of newly allocated containers, completed containers as well as current state of available resources. </p>
+ * <p>
+ * For the actual handling of the job, the <code>ApplicationMaster</code> has to
+ * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
+ * required no. of containers using {@link ResourceRequest} with the necessary
+ * resource specifications such as node location, computational
+ * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
+ * responds with an {@link AllocateResponse} that informs the
+ * <code>ApplicationMaster</code> of the set of newly allocated containers,
+ * completed containers as well as current state of available resources.
+ * </p>
  * 
- * <p> For each allocated container, the <code>ApplicationMaster</code> can then set up the necessary launch context via 
- * {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable, 
- * the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest} 
- * to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container. </p>
- *  
- * <p> The <code>ApplicationMaster</code> can monitor the launched container by either querying the <code>ResourceManager</code> 
- * using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager} 
- * by querying for the status of the allocated container's {@link ContainerId}.
+ * <p>
+ * For each allocated container, the <code>ApplicationMaster</code> can then set
+ * up the necessary launch context via {@link ContainerLaunchContext} to specify
+ * the allocated container id, local resources required by the executable, the
+ * environment to be setup for the executable, commands to execute, etc. and
+ * submit a {@link StartContainerRequest} to the {@link ContainerManager} to
+ * launch and execute the defined commands on the given allocated container.
+ * </p>
  * 
- * <p> After the job has been completed, the <code>ApplicationMaster</code> has to send a {@link FinishApplicationMasterRequest} 
- * to the <code>ResourceManager</code> to inform it that the <code>ApplicationMaster</code> has been completed. 
+ * <p>
+ * The <code>ApplicationMaster</code> can monitor the launched container by
+ * either querying the <code>ResourceManager</code> using
+ * {@link AMRMProtocol#allocate} to get updates on completed containers or via
+ * the {@link ContainerManager} by querying for the status of the allocated
+ * container's {@link ContainerId}.
+ *
+ * <p>
+ * After the job has been completed, the <code>ApplicationMaster</code> has to
+ * send a {@link FinishApplicationMasterRequest} to the
+ * <code>ResourceManager</code> to inform it that the
+ * <code>ApplicationMaster</code> has been completed.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -116,61 +142,58 @@ public class ApplicationMaster {
 
   private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
 
-  // Configuration 
+  // Configuration
   private Configuration conf;
   // YARN RPC to communicate with the Resource Manager or Node Manager
   private YarnRPC rpc;
 
   // Handle to communicate with the Resource Manager
-  private AMRMProtocol resourceManager;
+  private AMRMClient resourceManager;
 
   // Application Attempt Id ( combination of attemptId and fail count )
   private ApplicationAttemptId appAttemptID;
 
   // TODO
   // For status update for clients - yet to be implemented
-  // Hostname of the container 
+  // Hostname of the container
   private String appMasterHostname = "";
-  // Port on which the app master listens for status update requests from clients
+  // Port on which the app master listens for status updates from clients
   private int appMasterRpcPort = 0;
-  // Tracking url to which app master publishes info for clients to monitor 
+  // Tracking url to which app master publishes info for clients to monitor
   private String appMasterTrackingUrl = "";
 
   // App Master configuration
   // No. of containers to run shell command on
   private int numTotalContainers = 1;
-  // Memory to request for the container on which the shell command will run 
+  // Memory to request for the container on which the shell command will run
   private int containerMemory = 10;
   // Priority of the request
-  private int requestPriority; 
-
-  // Incremental counter for rpc calls to the RM
-  private AtomicInteger rmRequestID = new AtomicInteger();
+  private int requestPriority;
 
   // Simple flag to denote whether all works is done
-  private boolean appDone = false; 
+  private boolean appDone = false;
   // Counter for completed containers ( complete denotes successful or failed )
   private AtomicInteger numCompletedContainers = new AtomicInteger();
   // Allocated container count so that we know how many containers has the RM
   // allocated to us
   private AtomicInteger numAllocatedContainers = new AtomicInteger();
-  // Count of failed containers 
+  // Count of failed containers
   private AtomicInteger numFailedContainers = new AtomicInteger();
   // Count of containers already requested from the RM
-  // Needed as once requested, we should not request for containers again and again. 
-  // Only request for more if the original requirement changes. 
+  // Needed as once requested, we should not request for containers again.
+  // Only request for more if the original requirement changes.
   private AtomicInteger numRequestedContainers = new AtomicInteger();
 
-  // Shell command to be executed 
-  private String shellCommand = ""; 
+  // Shell command to be executed
+  private String shellCommand = "";
   // Args to be passed to the shell command
   private String shellArgs = "";
-  // Env variables to be setup for the shell command 
+  // Env variables to be setup for the shell command
   private Map<String, String> shellEnv = new HashMap<String, String>();
 
   // Location of shell script ( obtained from info set in env )
   // Shell script path in fs
-  private String shellScriptPath = ""; 
+  private String shellScriptPath = "";
   // Timestamp needed for creating a local resource
   private long shellScriptPathTimestamp = 0;
   // File length needed for local resource
@@ -179,9 +202,6 @@ public class ApplicationMaster {
   // Hardcoded path to shell script in launch container's local env
   private final String ExecShellStringPath = "ExecShellScript.sh";
 
-  // Containers to be released
-  private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
-
   // Launch threads
   private List<Thread> launchThreads = new ArrayList<Thread>();
 
@@ -205,8 +225,7 @@ public class ApplicationMaster {
     if (result) {
       LOG.info("Application Master completed successfully. exiting");
       System.exit(0);
-    }
-    else {
+    } else {
       LOG.info("Application Master failed. exiting");
       System.exit(2);
     }
@@ -221,7 +240,8 @@ public class ApplicationMaster {
     Map<String, String> envs = System.getenv();
     for (Map.Entry<String, String> env : envs.entrySet()) {
       LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
-      System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
+      System.out.println("System env: key=" + env.getKey() + ", val="
+          + env.getValue());
     }
 
     String cmd = "ls -al";
@@ -231,9 +251,10 @@ public class ApplicationMaster {
       pr = run.exec(cmd);
       pr.waitFor();
 
-      BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+      BufferedReader buf = new BufferedReader(new InputStreamReader(
+          pr.getInputStream()));
       String line = "";
-      while ((line=buf.readLine())!=null) {
+      while ((line = buf.readLine()) != null) {
         LOG.info("System CWD content: " + line);
         System.out.println("System CWD content: " + line);
       }
@@ -242,31 +263,39 @@ public class ApplicationMaster {
       e.printStackTrace();
     } catch (InterruptedException e) {
       e.printStackTrace();
-    } 
+    }
   }
 
   public ApplicationMaster() throws Exception {
     // Set up the configuration and RPC
-    conf = new Configuration();
+    conf = new YarnConfiguration();
     rpc = YarnRPC.create(conf);
   }
+
   /**
    * Parse command line options
-   * @param args Command line args 
-   * @return Whether init successful and run should be invoked 
+   *
+   * @param args Command line args
+   * @return Whether init successful and run should be invoked
    * @throws ParseException
-   * @throws IOException 
+   * @throws IOException
    */
   public boolean init(String[] args) throws ParseException, IOException {
 
     Options opts = new Options();
-    opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
-    opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
-    opts.addOption("shell_script", true, "Location of the shell script to be executed");
+    opts.addOption("app_attempt_id", true,
+        "App Attempt ID. Not to be used unless for testing purposes");
+    opts.addOption("shell_command", true,
+        "Shell command to be executed by the Application Master");
+    opts.addOption("shell_script", true,
+        "Location of the shell script to be executed");
     opts.addOption("shell_args", true, "Command line args for the shell script");
-    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
-    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
-    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
+    opts.addOption("shell_env", true,
+        "Environment for shell script. Specified as env_key=env_val pairs");
+    opts.addOption("container_memory", true,
+        "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("num_containers", true,
+        "No. of containers on which the shell command needs to be executed");
     opts.addOption("priority", true, "Application Priority. Default 0");
     opts.addOption("debug", false, "Dump out debug information");
 
@@ -275,7 +304,8 @@ public class ApplicationMaster {
 
     if (args.length == 0) {
       printUsage(opts);
-      throw new IllegalArgumentException("No args specified for application master to initialize");
+      throw new IllegalArgumentException(
+          "No args specified for application master to initialize");
     }
 
     if (cliParser.hasOption("help")) {
@@ -289,7 +319,6 @@ public class ApplicationMaster {
 
     Map<String, String> envs = System.getenv();
 
-    appAttemptID = Records.newRecord(ApplicationAttemptId.class);
     if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
       appAttemptID = ConverterUtils.toApplicationAttemptId(envs
           .get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
@@ -297,29 +326,31 @@ public class ApplicationMaster {
       if (cliParser.hasOption("app_attempt_id")) {
         String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
         appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
-      } 
-      else {
-        throw new IllegalArgumentException("Application Attempt Id not set in the environment");
+      } else {
+        throw new IllegalArgumentException(
+            "Application Attempt Id not set in the environment");
       }
     } else {
-      ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
+      ContainerId containerId = ConverterUtils.toContainerId(envs
+          .get(ApplicationConstants.AM_CONTAINER_ID_ENV));
       appAttemptID = containerId.getApplicationAttemptId();
     }
 
-    LOG.info("Application master for app"
-        + ", appId=" + appAttemptID.getApplicationId().getId()
-        + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp()
+    LOG.info("Application master for app" + ", appId="
+        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
+        + appAttemptID.getApplicationId().getClusterTimestamp()
         + ", attemptId=" + appAttemptID.getAttemptId());
 
     if (!cliParser.hasOption("shell_command")) {
-      throw new IllegalArgumentException("No shell command specified to be executed by application master");
+      throw new IllegalArgumentException(
+          "No shell command specified to be executed by application master");
     }
     shellCommand = cliParser.getOptionValue("shell_command");
 
     if (cliParser.hasOption("shell_args")) {
       shellArgs = cliParser.getOptionValue("shell_args");
     }
-    if (cliParser.hasOption("shell_env")) { 
+    if (cliParser.hasOption("shell_env")) {
       String shellEnvs[] = cliParser.getOptionValues("shell_env");
       for (String env : shellEnvs) {
         env = env.trim();
@@ -330,8 +361,8 @@ public class ApplicationMaster {
         }
         String key = env.substring(0, index);
         String val = "";
-        if (index < (env.length()-1)) {
-          val = env.substring(index+1);
+        if (index < (env.length() - 1)) {
+          val = env.substring(index + 1);
         }
         shellEnv.put(key, val);
       }
@@ -341,32 +372,37 @@ public class ApplicationMaster {
       shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
 
       if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
-        shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
+        shellScriptPathTimestamp = Long.valueOf(envs
+            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
       }
       if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
-        shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
+        shellScriptPathLen = Long.valueOf(envs
+            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
       }
 
       if (!shellScriptPath.isEmpty()
-          && (shellScriptPathTimestamp <= 0 
-          || shellScriptPathLen <= 0)) {
-        LOG.error("Illegal values in env for shell script path"
-            + ", path=" + shellScriptPath
-            + ", len=" + shellScriptPathLen
-            + ", timestamp=" + shellScriptPathTimestamp);
-        throw new IllegalArgumentException("Illegal values in env for shell script path");
+          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
+        LOG.error("Illegal values in env for shell script path" + ", path="
+            + shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
+            + shellScriptPathTimestamp);
+        throw new IllegalArgumentException(
+            "Illegal values in env for shell script path");
       }
     }
 
-    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
-    numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
-    requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+    containerMemory = Integer.parseInt(cliParser.getOptionValue(
+        "container_memory", "10"));
+    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
+        "num_containers", "1"));
+    requestPriority = Integer.parseInt(cliParser
+        .getOptionValue("priority", "0"));
 
     return true;
   }
 
   /**
-   * Helper function to print usage 
+   * Helper function to print usage
+   *
    * @param opts Parsed command line options
    */
   private void printUsage(Options opts) {
@@ -375,228 +411,240 @@ public class ApplicationMaster {
 
   /**
    * Main run function for the application master
+   *
    * @throws YarnRemoteException
    */
   public boolean run() throws YarnRemoteException {
     LOG.info("Starting ApplicationMaster");
 
     // Connect to ResourceManager
-    resourceManager = connectToRM();
+    resourceManager = new AMRMClientImpl(appAttemptID);
+    resourceManager.init(conf);
+    resourceManager.start();
 
-    // Setup local RPC Server to accept status requests directly from clients 
-    // TODO need to setup a protocol for client to be able to communicate to the RPC server 
-    // TODO use the rpc port info to register with the RM for the client to send requests to this app master
-
-    // Register self with ResourceManager 
-    RegisterApplicationMasterResponse response = registerToRM();
-    // Dump out information about cluster capability as seen by the resource manager
-    int minMem = response.getMinimumResourceCapability().getMemory();
-    int maxMem = response.getMaximumResourceCapability().getMemory();
-    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
-    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
-    // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 
-    // a multiple of the min value and cannot exceed the max. 
-    // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
-    if (containerMemory < minMem) {
-      LOG.info("Container memory specified below min threshold of cluster. Using min value."
-          + ", specified=" + containerMemory
-          + ", min=" + minMem);
-      containerMemory = minMem; 
-    } 
-    else if (containerMemory > maxMem) {
-      LOG.info("Container memory specified above max threshold of cluster. Using max value."
-          + ", specified=" + containerMemory
-          + ", max=" + maxMem);
-      containerMemory = maxMem;
-    }
-
-    // Setup heartbeat emitter
-    // TODO poll RM every now and then with an empty request to let RM know that we are alive
-    // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: 
-    // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
-    // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter 
-    // is not required.
-
-    // Setup ask for containers from RM
-    // Send request for containers to RM
-    // Until we get our fully allocated quota, we keep on polling RM for containers
-    // Keep looping until all the containers are launched and shell script executed on them 
-    // ( regardless of success/failure). 
-
-    int loopCounter = -1;
-
-    while (numCompletedContainers.get() < numTotalContainers
-        && !appDone) {
-      loopCounter++;
-
-      // log current state
-      LOG.info("Current application state: loop=" + loopCounter 
-          + ", appDone=" + appDone
-          + ", total=" + numTotalContainers
-          + ", requested=" + numRequestedContainers
-          + ", completed=" + numCompletedContainers
-          + ", failed=" + numFailedContainers
-          + ", currentAllocated=" + numAllocatedContainers);
-
-      // Sleep before each loop when asking RM for containers
-      // to avoid flooding RM with spurious requests when it 
-      // need not have any available containers 
-      // Sleeping for 1000 ms.
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        LOG.info("Sleep interrupted " + e.getMessage());
-      }
-
-      // No. of containers to request 
-      // For the first loop, askCount will be equal to total containers needed 
-      // From that point on, askCount will always be 0 as current implementation 
-      // does not change its ask on container failures. 
-      int askCount = numTotalContainers - numRequestedContainers.get();
-      numRequestedContainers.addAndGet(askCount);
-
-      // Setup request to be sent to RM to allocate containers
-      List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
-      if (askCount > 0) {
-        ResourceRequest containerAsk = setupContainerAskForRM(askCount);
-        resourceReq.add(containerAsk);
-      }
-
-      // Send the request to RM 
-      LOG.info("Asking RM for containers"
-          + ", askCount=" + askCount);
-      AMResponse amResp =sendContainerAskToRM(resourceReq);
-
-      // Retrieve list of allocated containers from the response 
-      List<Container> allocatedContainers = amResp.getAllocatedContainers();
-      LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
-      numAllocatedContainers.addAndGet(allocatedContainers.size());
-      for (Container allocatedContainer : allocatedContainers) {
-        LOG.info("Launching shell command on a new container."
-            + ", containerId=" + allocatedContainer.getId()
-            + ", containerNode=" + allocatedContainer.getNodeId().getHost() 
-            + ":" + allocatedContainer.getNodeId().getPort()
-            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
-            + ", containerState" + allocatedContainer.getState()
-            + ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
-        //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
-
-        LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
-        Thread launchThread = new Thread(runnableLaunchContainer);
-
-        // launch and start the container on a separate thread to keep the main thread unblocked
-        // as all containers may not be allocated at one go.
-        launchThreads.add(launchThread);
-        launchThread.start();
-      }
-
-      // Check what the current available resources in the cluster are
-      // TODO should we do anything if the available resources are not enough? 
-      Resource availableResources = amResp.getAvailableResources();
-      LOG.info("Current available resources in the cluster " + availableResources);
-
-      // Check the completed containers
-      List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
-      LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
-      for (ContainerStatus containerStatus : completedContainers) {
-        LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
-            + ", state=" + containerStatus.getState()
-            + ", exitStatus=" + containerStatus.getExitStatus() 
-            + ", diagnostics=" + containerStatus.getDiagnostics());
-
-        // non complete containers should not be here 
-        assert(containerStatus.getState() == ContainerState.COMPLETE);
-
-        // increment counters for completed/failed containers
-        int exitStatus = containerStatus.getExitStatus();
-        if (0 != exitStatus) {
-          // container failed 
-          if (-100 != exitStatus) {
-            // shell script failed
-            // counts as completed 
+    try {
+      // Setup local RPC Server to accept status requests directly from clients
+      // TODO need to setup a protocol for client to be able to communicate to
+      // the RPC server
+      // TODO use the rpc port info to register with the RM for the client to
+      // send requests to this app master
+
+      // Register self with ResourceManager
+      RegisterApplicationMasterResponse response = resourceManager
+          .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+              appMasterTrackingUrl);
+      // Dump out information about cluster capability as seen by the
+      // resource manager
+      int minMem = response.getMinimumResourceCapability().getMemory();
+      int maxMem = response.getMaximumResourceCapability().getMemory();
+      LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+      LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+      // A resource ask has to be atleast the minimum of the capability of the
+      // cluster, the value has to be a multiple of the min value and cannot
+      // exceed the max.
+      // If it is not an exact multiple of min, the RM will allocate to the
+      // nearest multiple of min
+      if (containerMemory < minMem) {
+        LOG.info("Container memory specified below min threshold of cluster."
+            + " Using min value." + ", specified=" + containerMemory + ", min="
+            + minMem);
+        containerMemory = minMem;
+      } else if (containerMemory > maxMem) {
+        LOG.info("Container memory specified above max threshold of cluster."
+            + " Using max value." + ", specified=" + containerMemory + ", max="
+            + maxMem);
+        containerMemory = maxMem;
+      }
+
+      // Setup heartbeat emitter
+      // TODO poll RM every now and then with an empty request to let RM know
+      // that we are alive
+      // The heartbeat interval after which an AM is timed out by the RM is
+      // defined by a config setting:
+      // RM_AM_EXPIRY_INTERVAL_MS with default defined by
+      // DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
+      // The allocate calls to the RM count as heartbeats so, for now,
+      // this additional heartbeat emitter is not required.
+
+      // Setup ask for containers from RM
+      // Send request for containers to RM
+      // Until we get our fully allocated quota, we keep on polling RM for
+      // containers
+      // Keep looping until all the containers are launched and shell script
+      // executed on them ( regardless of success/failure).
+
+      int loopCounter = -1;
+
+      while (numCompletedContainers.get() < numTotalContainers && !appDone) {
+        loopCounter++;
+
+        // log current state
+        LOG.info("Current application state: loop=" + loopCounter
+            + ", appDone=" + appDone + ", total=" + numTotalContainers
+            + ", requested=" + numRequestedContainers + ", completed="
+            + numCompletedContainers + ", failed=" + numFailedContainers
+            + ", currentAllocated=" + numAllocatedContainers);
+
+        // Sleep before each loop when asking RM for containers
+        // to avoid flooding RM with spurious requests when it
+        // need not have any available containers
+        // Sleeping for 1000 ms.
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          LOG.info("Sleep interrupted " + e.getMessage());
+        }
+
+        // No. of containers to request
+        // For the first loop, askCount will be equal to total containers needed
+        // From that point on, askCount will always be 0 as current
+        // implementation does not change its ask on container failures.
+        int askCount = numTotalContainers - numRequestedContainers.get();
+        numRequestedContainers.addAndGet(askCount);
+
+        if (askCount > 0) {
+          ContainerRequest containerAsk = setupContainerAskForRM(askCount);
+          resourceManager.addContainerRequest(containerAsk);
+        }
+
+        // Send the request to RM
+        LOG.info("Asking RM for containers" + ", askCount=" + askCount);
+        AMResponse amResp = sendContainerAskToRM();
+
+        // Retrieve list of allocated containers from the response
+        List<Container> allocatedContainers = amResp.getAllocatedContainers();
+        LOG.info("Got response from RM for container ask, allocatedCnt="
+            + allocatedContainers.size());
+        numAllocatedContainers.addAndGet(allocatedContainers.size());
+        for (Container allocatedContainer : allocatedContainers) {
+          LOG.info("Launching shell command on a new container."
+              + ", containerId=" + allocatedContainer.getId()
+              + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+              + ":" + allocatedContainer.getNodeId().getPort()
+              + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+              + ", containerState" + allocatedContainer.getState()
+              + ", containerResourceMemory"
+              + allocatedContainer.getResource().getMemory());
+          // + ", containerToken"
+          // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+          LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+              allocatedContainer);
+          Thread launchThread = new Thread(runnableLaunchContainer);
+
+          // launch and start the container on a separate thread to keep
+          // the main thread unblocked
+          // as all containers may not be allocated at one go.
+          launchThreads.add(launchThread);
+          launchThread.start();
+        }
+
+        // Check what the current available resources in the cluster are
+        // TODO should we do anything if the available resources are not enough?
+        Resource availableResources = amResp.getAvailableResources();
+        LOG.info("Current available resources in the cluster "
+            + availableResources);
+
+        // Check the completed containers
+        List<ContainerStatus> completedContainers = amResp
+            .getCompletedContainersStatuses();
+        LOG.info("Got response from RM for container ask, completedCnt="
+            + completedContainers.size());
+        for (ContainerStatus containerStatus : completedContainers) {
+          LOG.info("Got container status for containerID="
+              + containerStatus.getContainerId() + ", state="
+              + containerStatus.getState() + ", exitStatus="
+              + containerStatus.getExitStatus() + ", diagnostics="
+              + containerStatus.getDiagnostics());
+
+          // non complete containers should not be here
+          assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+          // increment counters for completed/failed containers
+          int exitStatus = containerStatus.getExitStatus();
+          if (0 != exitStatus) {
+            // container failed
+            if (-100 != exitStatus) {
+              // shell script failed
+              // counts as completed
+              numCompletedContainers.incrementAndGet();
+              numFailedContainers.incrementAndGet();
+            } else {
+              // something else bad happened
+              // app job did not complete for some reason
+              // we should re-try as the container was lost for some reason
+              numAllocatedContainers.decrementAndGet();
+              numRequestedContainers.decrementAndGet();
+              // we do not need to release the container as it would be done
+              // by the RM/CM.
+            }
+          } else {
+            // nothing to do
+            // container completed successfully
             numCompletedContainers.incrementAndGet();
-            numFailedContainers.incrementAndGet();
-          }
-          else { 
-            // something else bad happened 
-            // app job did not complete for some reason 
-            // we should re-try as the container was lost for some reason
-            numAllocatedContainers.decrementAndGet();
-            numRequestedContainers.decrementAndGet();
-            // we do not need to release the container as it would be done
-            // by the RM/CM.
+            LOG.info("Container completed successfully." + ", containerId="
+                + containerStatus.getContainerId());
           }
         }
-        else { 
-          // nothing to do 
-          // container completed successfully 
-          numCompletedContainers.incrementAndGet();
-          LOG.info("Container completed successfully."
-              + ", containerId=" + containerStatus.getContainerId());
+        if (numCompletedContainers.get() == numTotalContainers) {
+          appDone = true;
         }
 
-      }
-      if (numCompletedContainers.get() == numTotalContainers) {
-        appDone = true;
+        LOG.info("Current application state: loop=" + loopCounter
+            + ", appDone=" + appDone + ", total=" + numTotalContainers
+            + ", requested=" + numRequestedContainers + ", completed="
+            + numCompletedContainers + ", failed=" + numFailedContainers
+            + ", currentAllocated=" + numAllocatedContainers);
+
+        // TODO
+        // Add a timeout handling layer
+        // for misbehaving shell commands
       }
 
-      LOG.info("Current application state: loop=" + loopCounter
-          + ", appDone=" + appDone
-          + ", total=" + numTotalContainers
-          + ", requested=" + numRequestedContainers
-          + ", completed=" + numCompletedContainers
-          + ", failed=" + numFailedContainers
-          + ", currentAllocated=" + numAllocatedContainers);
-
-      // TODO 
-      // Add a timeout handling layer 
-      // for misbehaving shell commands
-    }
-
-    // Join all launched threads
-    // needed for when we time out 
-    // and we need to release containers
-    for (Thread launchThread : launchThreads) {
-      try {
-        launchThread.join(10000);
-      } catch (InterruptedException e) {
-        LOG.info("Exception thrown in thread join: " + e.getMessage());
-        e.printStackTrace();
+      // Join all launched threads
+      // needed for when we time out
+      // and we need to release containers
+      for (Thread launchThread : launchThreads) {
+        try {
+          launchThread.join(10000);
+        } catch (InterruptedException e) {
+          LOG.info("Exception thrown in thread join: " + e.getMessage());
+          e.printStackTrace();
+        }
       }
-    }
 
-    // When the application completes, it should send a finish application signal 
-    // to the RM
-    LOG.info("Application completed. Signalling finish to RM");
-
-    FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
-    finishReq.setAppAttemptId(appAttemptID);
-    boolean isSuccess = true;
-    if (numFailedContainers.get() == 0) {
-      finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
-    }
-    else {
-      finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
-      String diagnostics = "Diagnostics."
-          + ", total=" + numTotalContainers
-          + ", completed=" + numCompletedContainers.get()
-          + ", allocated=" + numAllocatedContainers.get()
-          + ", failed=" + numFailedContainers.get();
-      finishReq.setDiagnostics(diagnostics);
-      isSuccess = false;
+      // When the application completes, it should send a finish application
+      // signal to the RM
+      LOG.info("Application completed. Signalling finish to RM");
+
+      FinalApplicationStatus appStatus;
+      String appMessage = null;
+      boolean isSuccess = true;
+      if (numFailedContainers.get() == 0) {
+        appStatus = FinalApplicationStatus.SUCCEEDED;
+      } else {
+        appStatus = FinalApplicationStatus.FAILED;
+        appMessage = "Diagnostics." + ", total=" + numTotalContainers
+            + ", completed=" + numCompletedContainers.get() + ", allocated="
+            + numAllocatedContainers.get() + ", failed="
+            + numFailedContainers.get();
+        isSuccess = false;
+      }
+      resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+      return isSuccess;
+    } finally {
+      resourceManager.stop();
     }
-    resourceManager.finishApplicationMaster(finishReq);
-    return isSuccess;
   }
 
   /**
-   * Thread to connect to the {@link ContainerManager} and 
-   * launch the container that will execute the shell command. 
+   * Thread to connect to the {@link ContainerManager} and launch the container
+   * that will execute the shell command.
    */
   private class LaunchContainerRunnable implements Runnable {
 
-    // Allocated container 
+    // Allocated container
     Container container;
     // Handle to communicate with ContainerManager
     ContainerManager cm;
@@ -612,15 +660,16 @@ public class ApplicationMaster {
      * Helper function to connect to CM
      */
     private void connectToCM() {
-      LOG.debug("Connecting to ContainerManager for containerid=" + container.getId());
+      LOG.debug("Connecting to ContainerManager for containerid="
+          + container.getId());
       String cmIpPortStr = container.getNodeId().getHost() + ":"
           + container.getNodeId().getPort();
       InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
       LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
-      this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
+      this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class,
+          cmAddress, conf));
     }
 
-
     @Override
     /**
      * Connects to CM, sets up container launch context 
@@ -628,11 +677,13 @@ public class ApplicationMaster {
      * start request to the CM. 
      */
     public void run() {
-      // Connect to ContainerManager 
+      // Connect to ContainerManager
       connectToCM();
 
-      LOG.info("Setting up container launch container for containerid=" + container.getId());
-      ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+      LOG.info("Setting up container launch container for containerid="
+          + container.getId());
+      ContainerLaunchContext ctx = Records
+          .newRecord(ContainerLaunchContext.class);
 
       ctx.setContainerId(container.getId());
       ctx.setResource(container.getResource());
@@ -642,28 +693,30 @@ public class ApplicationMaster {
       ctx.setUser(jobUserName);
       LOG.info("Setting user in ContainerLaunchContext to: " + jobUserName);
 
-      // Set the environment 
+      // Set the environment
       ctx.setEnvironment(shellEnv);
 
-      // Set the local resources 
+      // Set the local resources
       Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
 
-      // The container for the eventual shell commands needs its own local resources too. 
-      // In this scenario, if a shell script is specified, we need to have it copied 
-      // and made available to the container. 
+      // The container for the eventual shell commands needs its own local
+      // resources too.
+      // In this scenario, if a shell script is specified, we need to have it
+      // copied and made available to the container.
       if (!shellScriptPath.isEmpty()) {
         LocalResource shellRsrc = Records.newRecord(LocalResource.class);
         shellRsrc.setType(LocalResourceType.FILE);
         shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
         try {
-          shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
+          shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
+              shellScriptPath)));
         } catch (URISyntaxException e) {
-          LOG.error("Error when trying to use shell script path specified in env"
-              + ", path=" + shellScriptPath);
+          LOG.error("Error when trying to use shell script path specified"
+              + " in env, path=" + shellScriptPath);
           e.printStackTrace();
 
-          // A failure scenario on bad input such as invalid shell script path 
-          // We know we cannot continue launching the container 
+          // A failure scenario on bad input such as invalid shell script path
+          // We know we cannot continue launching the container
           // so we should release it.
           // TODO
           numCompletedContainers.incrementAndGet();
@@ -676,12 +729,12 @@ public class ApplicationMaster {
       }
       ctx.setLocalResources(localResources);
 
-      // Set the necessary command to execute on the allocated container 
+      // Set the necessary command to execute on the allocated container
       Vector<CharSequence> vargs = new Vector<CharSequence>(5);
 
-      // Set executable command 
+      // Set executable command
       vargs.add(shellCommand);
-      // Set shell script path 
+      // Set shell script path
       if (!shellScriptPath.isEmpty()) {
         vargs.add(ExecShellStringPath);
       }
@@ -689,11 +742,6 @@ public class ApplicationMaster {
       // Set args for the shell command if any
       vargs.add(shellArgs);
       // Add log redirect params
-      // TODO
-      // We should redirect the output to hdfs instead of local logs 
-      // so as to be able to look at the final output after the containers 
-      // have been released. 
-      // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err] 
       vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
       vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
 
@@ -707,131 +755,78 @@ public class ApplicationMaster {
       commands.add(command.toString());
       ctx.setCommands(commands);
 
-      StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+      StartContainerRequest startReq = Records
+          .newRecord(StartContainerRequest.class);
       startReq.setContainerLaunchContext(ctx);
       try {
         cm.startContainer(startReq);
       } catch (YarnRemoteException e) {
-        LOG.info("Start container failed for :"
-            + ", containerId=" + container.getId());
+        LOG.info("Start container failed for :" + ", containerId="
+            + container.getId());
         e.printStackTrace();
-        // TODO do we need to release this container? 
+        // TODO do we need to release this container?
       }
 
       // Get container status?
-      // Left commented out as the shell scripts are short lived 
-      // and we are relying on the status for completed containers from RM to detect status
-
-      //    GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
-      //    statusReq.setContainerId(container.getId());
-      //    GetContainerStatusResponse statusResp;
-      //try {
-      //statusResp = cm.getContainerStatus(statusReq);
-      //    LOG.info("Container Status"
-      //    + ", id=" + container.getId()
-      //    + ", status=" +statusResp.getStatus());
-      //} catch (YarnRemoteException e) {
-      //e.printStackTrace();
-      //}
+      // Left commented out as the shell scripts are short lived
+      // and we are relying on the status for completed containers
+      // from RM to detect status
+
+      // GetContainerStatusRequest statusReq =
+      // Records.newRecord(GetContainerStatusRequest.class);
+      // statusReq.setContainerId(container.getId());
+      // GetContainerStatusResponse statusResp;
+      // try {
+      // statusResp = cm.getContainerStatus(statusReq);
+      // LOG.info("Container Status"
+      // + ", id=" + container.getId()
+      // + ", status=" +statusResp.getStatus());
+      // } catch (YarnRemoteException e) {
+      // e.printStackTrace();
+      // }
     }
   }
 
   /**
-   * Connect to the Resource Manager
-   * @return Handle to communicate with the RM
-   */
-  private AMRMProtocol connectToRM() {
-    YarnConfiguration yarnConf = new YarnConfiguration(conf);
-    InetSocketAddress rmAddress = yarnConf.getSocketAddr(
-        YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
-  }
-
-  /** 
-   * Register the Application Master to the Resource Manager
-   * @return the registration response from the RM
-   * @throws YarnRemoteException
-   */
-  private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
-    RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
-
-    // set the required info into the registration request: 
-    // application attempt id, 
-    // host on which the app master is running
-    // rpc port on which the app master accepts requests from the client 
-    // tracking url for the app master
-    appMasterRequest.setApplicationAttemptId(appAttemptID);
-    appMasterRequest.setHost(appMasterHostname);
-    appMasterRequest.setRpcPort(appMasterRpcPort);
-    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-
-    return resourceManager.registerApplicationMaster(appMasterRequest);
-  }
-
-  /**
    * Setup the request that will be sent to the RM for the container ask.
+   *
    * @param numContainers Containers to ask for from RM
    * @return the setup ResourceRequest to be sent to RM
    */
-  private ResourceRequest setupContainerAskForRM(int numContainers) {
-    ResourceRequest request = Records.newRecord(ResourceRequest.class);
-
-    // setup requirements for hosts 
-    // whether a particular rack/host is needed 
-    // Refer to apis under org.apache.hadoop.net for more 
-    // details on how to get figure out rack/host mapping.
+  private ContainerRequest setupContainerAskForRM(int numContainers) {
+    // setup requirements for hosts
     // using * as any host will do for the distributed shell app
-    request.setHostName("*");
-
-    // set no. of containers needed
-    request.setNumContainers(numContainers);
-
     // set the priority for the request
     Priority pri = Records.newRecord(Priority.class);
-    // TODO - what is the range for priority? how to decide? 
+    // TODO - what is the range for priority? how to decide?
     pri.setPriority(requestPriority);
-    request.setPriority(pri);
 
     // Set up resource type requirements
     // For now, only memory is supported so we set memory requirements
     Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(containerMemory);
-    request.setCapability(capability);
 
+    ContainerRequest request = new ContainerRequest(capability, null, null,
+        pri, numContainers);
+    LOG.info("Requested container ask: " + request.toString());
     return request;
   }
 
   /**
    * Ask RM to allocate given no. of containers to this Application Master
+   *
    * @param requestedContainers Containers to ask for from RM
-   * @return Response from RM to AM with allocated containers 
+   * @return Response from RM to AM with allocated containers
    * @throws YarnRemoteException
    */
-  private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
-      throws YarnRemoteException {
-    AllocateRequest req = Records.newRecord(AllocateRequest.class);
-    req.setResponseId(rmRequestID.incrementAndGet());
-    req.setApplicationAttemptId(appAttemptID);
-    req.addAllAsks(requestedContainers);
-    req.addAllReleases(releasedContainers);
-    req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
-
-    LOG.info("Sending request to RM for containers"
-        + ", requestedSet=" + requestedContainers.size()
-        + ", releasedSet=" + releasedContainers.size()
-        + ", progress=" + req.getProgress());
+  private AMResponse sendContainerAskToRM() throws YarnRemoteException {
+    float progressIndicator = (float) numCompletedContainers.get()
+        / numTotalContainers;
 
-    for (ResourceRequest  rsrcReq : requestedContainers) {
-      LOG.info("Requested container ask: " + rsrcReq.toString());
-    }
-    for (ContainerId id : releasedContainers) {
-      LOG.info("Released container, id=" + id.getId());
-    }
+    LOG.info("Sending request to RM for containers" + ", progress="
+        + progressIndicator);
 
-    AllocateResponse resp = resourceManager.allocate(req);
+    AllocateResponse resp = resourceManager.allocate(progressIndicator);
     return resp.getAMResponse();
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Wed Jan 23 18:51:24 2013
@@ -18,10 +18,7 @@
 
 package org.apache.hadoop.yarn.applications.distributedshell;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -212,7 +209,7 @@ public class Client extends YarnClientIm
   /**
    */
   public Client() throws Exception  {
-    this(new Configuration());
+    this(new YarnConfiguration());
   }
 
   /**

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Wed Jan 23 18:51:24 2013
@@ -120,6 +120,7 @@ public class TestDistributedShell {
     boolean exceptionThrown = false;
     try {
       boolean initSuccess = client.init(args);
+      Assert.assertTrue(initSuccess);
     }
     catch (IllegalArgumentException e) {
       exceptionThrown = true;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml Wed Jan 23 18:51:24 2013
@@ -45,6 +45,29 @@
   </dependencies>
 
   <build>
+    <!--
+    Include all files in src/main/resources.  By default, do not apply property
+    substitution (filtering=false), but do apply property substitution to
+    yarn-version-info.properties (filtering=true).  This will substitute the
+    version information correctly, but prevent Maven from altering other files
+    like yarn-default.xml.
+    -->
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <excludes>
+          <exclude>yarn-version-info.properties</exclude>
+        </excludes>
+        <filtering>false</filtering>
+      </resource>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <includes>
+          <include>yarn-version-info.properties</include>
+        </includes>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
     <plugins>
      <plugin>
         <groupId>org.apache.rat</groupId>
@@ -65,6 +88,27 @@
         </configuration>
       </plugin>
       <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>version-info</id>
+            <goals>
+              <goal>version-info</goal>
+            </goals>
+            <configuration>
+              <source>
+                <directory>${basedir}/src/main</directory>
+                <includes>
+                  <include>java/**/*.java</include>
+                  <include>proto/**/*.proto</include>
+                </includes>
+              </source>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <artifactId>maven-jar-plugin</artifactId>
         <executions>
           <execution>
@@ -127,20 +171,6 @@
               <goal>exec</goal>
             </goals>
           </execution>
-          <execution>
-            <id>generate-version</id>
-            <phase>generate-sources</phase>
-            <configuration>
-              <executable>scripts/saveVersion.sh</executable>
-              <arguments>
-                <argument>${project.version}</argument>
-                <argument>${project.build.directory}</argument>
-              </arguments>
-            </configuration>
-            <goals>
-              <goal>exec</goal>
-            </goals>
-          </execution>
         </executions>
       </plugin>
 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Jan 23 18:51:24 2013
@@ -236,6 +236,10 @@ public class YarnConfiguration extends C
   /** The class to use as the persistent store.*/
   public static final String RM_STORE = RM_PREFIX + "store.class";
   
+  /** URI for FileSystemRMStateStore */
+  public static final String FS_RM_STATE_STORE_URI =
+                                           RM_PREFIX + "fs.rm-state-store.uri";
+
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =
     RM_PREFIX + "max-completed-applications";

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/BaseClientToAMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/BaseClientToAMTokenSecretManager.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/BaseClientToAMTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/BaseClientToAMTokenSecretManager.java Wed Jan 23 18:51:24 2013
@@ -21,24 +21,25 @@ package org.apache.hadoop.yarn.security.
 import javax.crypto.SecretKey;
 
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 
 public abstract class BaseClientToAMTokenSecretManager extends
     SecretManager<ClientTokenIdentifier> {
 
-  public abstract SecretKey getMasterKey(ApplicationId applicationId);
+  public abstract SecretKey getMasterKey(
+      ApplicationAttemptId applicationAttemptId);
 
   @Override
   public synchronized byte[] createPassword(
       ClientTokenIdentifier identifier) {
     return createPassword(identifier.getBytes(),
-      getMasterKey(identifier.getApplicationID()));
+      getMasterKey(identifier.getApplicationAttemptID()));
   }
 
   @Override
   public byte[] retrievePassword(ClientTokenIdentifier identifier)
       throws SecretManager.InvalidToken {
-    SecretKey masterKey = getMasterKey(identifier.getApplicationID());
+    SecretKey masterKey = getMasterKey(identifier.getApplicationAttemptID());
     if (masterKey == null) {
       throw new SecretManager.InvalidToken("Illegal client-token!");
     }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientToAMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientToAMTokenSecretManager.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientToAMTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientToAMTokenSecretManager.java Wed Jan 23 18:51:24 2013
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.security.
 import javax.crypto.SecretKey;
 
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 
 public class ClientToAMTokenSecretManager extends
     BaseClientToAMTokenSecretManager {
@@ -29,14 +29,14 @@ public class ClientToAMTokenSecretManage
   // Only one client-token and one master-key for AM
   private final SecretKey masterKey;
 
-  public ClientToAMTokenSecretManager(ApplicationId applicationID,
-      byte[] secretKeyBytes) {
+  public ClientToAMTokenSecretManager(
+      ApplicationAttemptId applicationAttemptID, byte[] secretKeyBytes) {
     super();
     this.masterKey = SecretManager.createSecretKey(secretKeyBytes);
   }
 
   @Override
-  public SecretKey getMasterKey(ApplicationId applicationID) {
+  public SecretKey getMasterKey(ApplicationAttemptId applicationAttemptID) {
     // Only one client-token and one master-key for AM, just return that.
     return this.masterKey;
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientTokenIdentifier.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientTokenIdentifier.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientTokenIdentifier.java Wed Jan 23 18:51:24 2013
@@ -27,14 +27,14 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
 public class ClientTokenIdentifier extends TokenIdentifier {
 
   public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
 
-  private ApplicationId applicationId;
+  private ApplicationAttemptId applicationAttemptId;
 
   // TODO: Add more information in the tokenID such that it is not
   // transferrable, more secure etc.
@@ -42,25 +42,29 @@ public class ClientTokenIdentifier exten
   public ClientTokenIdentifier() {
   }
 
-  public ClientTokenIdentifier(ApplicationId id) {
+  public ClientTokenIdentifier(ApplicationAttemptId id) {
     this();
-    this.applicationId = id;
+    this.applicationAttemptId = id;
   }
 
-  public ApplicationId getApplicationID() {
-    return this.applicationId;
+  public ApplicationAttemptId getApplicationAttemptID() {
+    return this.applicationAttemptId;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeLong(this.applicationId.getClusterTimestamp());
-    out.writeInt(this.applicationId.getId());
+    out.writeLong(this.applicationAttemptId.getApplicationId()
+      .getClusterTimestamp());
+    out.writeInt(this.applicationAttemptId.getApplicationId().getId());
+    out.writeInt(this.applicationAttemptId.getAttemptId());
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    this.applicationId =
-        BuilderUtils.newApplicationId(in.readLong(), in.readInt());
+    this.applicationAttemptId =
+        BuilderUtils.newApplicationAttemptId(
+          BuilderUtils.newApplicationId(in.readLong(), in.readInt()),
+          in.readInt());
   }
 
   @Override
@@ -70,10 +74,10 @@ public class ClientTokenIdentifier exten
 
   @Override
   public UserGroupInformation getUser() {
-    if (this.applicationId == null) {
+    if (this.applicationAttemptId == null) {
       return null;
     }
-    return UserGroupInformation.createRemoteUser(this.applicationId.toString());
+    return UserGroupInformation.createRemoteUser(this.applicationAttemptId.toString());
   }
 
   @InterfaceAudience.Private

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Wed Jan 23 18:51:24 2013
@@ -25,10 +25,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -37,12 +33,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ClientToken;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -52,9 +50,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -256,30 +254,36 @@ public class BuilderUtils {
     return container;
   }
 
-  public static DelegationToken newDelegationToken(
-      byte[] identifier, String kind, byte[] password,
-      String service) {
-      DelegationToken delegationToken = recordFactory.newRecordInstance(
-          DelegationToken.class);
-      delegationToken.setIdentifier(ByteBuffer.wrap(identifier));
-      delegationToken.setKind(kind);
-      delegationToken.setPassword(ByteBuffer.wrap(password));
-      delegationToken.setService(service);
-      return delegationToken;
+  public static <T extends Token> T newToken(Class<T> tokenClass,
+      byte[] identifier, String kind, byte[] password, String service) {
+    T token = recordFactory.newRecordInstance(tokenClass);
+    token.setIdentifier(ByteBuffer.wrap(identifier));
+    token.setKind(kind);
+    token.setPassword(ByteBuffer.wrap(password));
+    token.setService(service);
+    return token;
+  }
+
+  public static DelegationToken newDelegationToken(byte[] identifier,
+      String kind, byte[] password, String service) {
+    return newToken(DelegationToken.class, identifier, kind, password, service);
+  }
+
+  public static ClientToken newClientToken(byte[] identifier, String kind,
+      byte[] password, String service) {
+    return newToken(ClientToken.class, identifier, kind, password, service);
   }
-  
+
   public static ContainerToken newContainerToken(NodeId nodeId,
-      ByteBuffer password, ContainerTokenIdentifier tokenIdentifier) {
-    ContainerToken containerToken = recordFactory
-        .newRecordInstance(ContainerToken.class);
-    containerToken.setIdentifier(ByteBuffer.wrap(tokenIdentifier.getBytes()));
-    containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
-    containerToken.setPassword(password);
+      byte[] password, ContainerTokenIdentifier tokenIdentifier) {
     // RPC layer client expects ip:port as service for tokens
-    InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
-        nodeId.getPort());
-    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token 
-    containerToken.setService(SecurityUtil.buildTokenService(addr).toString());
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    ContainerToken containerToken =
+        newToken(ContainerToken.class, tokenIdentifier.getBytes(),
+          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
     return containerToken;
   }
 
@@ -333,7 +337,7 @@ public class BuilderUtils {
   public static ApplicationReport newApplicationReport(
       ApplicationId applicationId, ApplicationAttemptId applicationAttemptId,
       String user, String queue, String name, String host, int rpcPort,
-      String clientToken, YarnApplicationState state, String diagnostics,
+      ClientToken clientToken, YarnApplicationState state, String diagnostics,
       String url, long startTime, long finishTime,
       FinalApplicationStatus finalStatus,
       ApplicationResourceUsageReport appResources, String origTrackingUrl) {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/YarnVersionInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/YarnVersionInfo.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/YarnVersionInfo.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/YarnVersionInfo.java Wed Jan 23 18:51:24 2013
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.util;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.YarnVersionAnnotation;
+import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -30,31 +30,20 @@ import org.apache.hadoop.classification.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class YarnVersionInfo {
+public class YarnVersionInfo extends VersionInfo {
   private static final Log LOG = LogFactory.getLog(YarnVersionInfo.class);
 
-  private static Package myPackage;
-  private static YarnVersionAnnotation version;
-  
-  static {
-    myPackage = YarnVersionAnnotation.class.getPackage();
-    version = myPackage.getAnnotation(YarnVersionAnnotation.class);
-  }
+  private static YarnVersionInfo YARN_VERSION_INFO = new YarnVersionInfo();
 
-  /**
-   * Get the meta-data for the Yarn package.
-   * @return
-   */
-  static Package getPackage() {
-    return myPackage;
+  protected YarnVersionInfo() {
+    super("yarn");
   }
-  
   /**
    * Get the Yarn version.
    * @return the Yarn version string, eg. "0.6.3-dev"
    */
   public static String getVersion() {
-    return version != null ? version.version() : "Unknown";
+    return YARN_VERSION_INFO._getVersion();
   }
   
   /**
@@ -62,7 +51,7 @@ public class YarnVersionInfo {
    * @return the revision number, eg. "451451"
    */
   public static String getRevision() {
-    return version != null ? version.revision() : "Unknown";
+    return YARN_VERSION_INFO._getRevision();
   }
 
   /**
@@ -70,7 +59,7 @@ public class YarnVersionInfo {
    * @return The branch name, e.g. "trunk" or "branches/branch-0.20"
    */
   public static String getBranch() {
-    return version != null ? version.branch() : "Unknown";
+    return YARN_VERSION_INFO._getBranch();
   }
 
   /**
@@ -78,7 +67,7 @@ public class YarnVersionInfo {
    * @return the compilation date in unix date format
    */
   public static String getDate() {
-    return version != null ? version.date() : "Unknown";
+    return YARN_VERSION_INFO._getDate();
   }
   
   /**
@@ -86,14 +75,14 @@ public class YarnVersionInfo {
    * @return the username of the user
    */
   public static String getUser() {
-    return version != null ? version.user() : "Unknown";
+    return YARN_VERSION_INFO._getUser();
   }
   
   /**
    * Get the subversion URL for the root Yarn directory.
    */
   public static String getUrl() {
-    return version != null ? version.url() : "Unknown";
+    return YARN_VERSION_INFO._getUrl();
   }
 
   /**
@@ -101,7 +90,7 @@ public class YarnVersionInfo {
    * built.
    **/
   public static String getSrcChecksum() {
-    return version != null ? version.srcChecksum() : "Unknown";
+    return YARN_VERSION_INFO._getSrcChecksum();
   }
 
   /**
@@ -109,14 +98,11 @@ public class YarnVersionInfo {
    * revision, user and date. 
    */
   public static String getBuildVersion(){
-    return YarnVersionInfo.getVersion() + 
-    " from " + YarnVersionInfo.getRevision() +
-    " by " + YarnVersionInfo.getUser() + 
-    " source checksum " + YarnVersionInfo.getSrcChecksum();
+    return YARN_VERSION_INFO._getBuildVersion();
   }
   
   public static void main(String[] args) {
-    LOG.debug("version: "+ version);
+    LOG.debug("version: "+ getVersion());
     System.out.println("Yarn " + getVersion());
     System.out.println("Subversion " + getUrl() + " -r " + getRevision());
     System.out.println("Compiled by " + getUser() + " on " + getDate());

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Jan 23 18:51:24 2013
@@ -230,6 +230,17 @@
   <property>
     <description>The class to use as the persistent store.</description>
     <name>yarn.resourcemanager.store.class</name>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
+  </property>
+
+  <property>
+    <description>URI pointing to the location of the FileSystem path where
+    RM state will be stored. This must be supplied when using
+    org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+    as the value for yarn.resourcemanager.store.class</description>
+    <name>yarn.resourcemanager.fs.rm-state-store.uri</name>
+    <value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
+    <!--value>hdfs://localhost:9000/rmstore</value-->
   </property>
 
   <property>

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java Wed Jan 23 18:51:24 2013
@@ -199,7 +199,6 @@ public class BaseContainerTokenSecretMan
       this.readLock.unlock();
     }
 
-    return BuilderUtils.newContainerToken(nodeId, ByteBuffer.wrap(password),
-      tokenIdentifier);
+    return BuilderUtils.newContainerToken(nodeId, password, tokenIdentifier);
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml Wed Jan 23 18:51:24 2013
@@ -67,6 +67,9 @@
                     <exec executable="make" dir="${project.build.directory}/native" failonerror="true">
                       <arg line="VERBOSE=1"/>
                     </exec>
+                    <!-- The second make is a workaround for HADOOP-9215.  It can
+                         be removed when version 2.6 of cmake is no longer supported . -->
+                    <exec executable="make" dir="${project.build.directory}/native" failonerror="true"></exec>
                   </target>
                 </configuration>
               </execution>

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Wed Jan 23 18:51:24 2013
@@ -41,6 +41,12 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-web-proxy</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Jan 23 18:51:24 2013
@@ -28,19 +28,17 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -236,21 +234,6 @@ public class RMAppManager implements Eve
     RMApp application = null;
     try {
 
-      String clientTokenStr = null;
-      if (UserGroupInformation.isSecurityEnabled()) {
-
-        // TODO: This needs to move to per-AppAttempt
-        this.rmContext.getClientToAMTokenSecretManager().registerApplication(
-          applicationId);
-
-        Token<ClientTokenIdentifier> clientToken = new 
-            Token<ClientTokenIdentifier>(
-            new ClientTokenIdentifier(applicationId),
-            this.rmContext.getClientToAMTokenSecretManager());
-        clientTokenStr = clientToken.encodeToUrlString();
-        LOG.debug("Sending client token as " + clientTokenStr);
-      }
-      
       // Sanity checks
       if (submissionContext.getQueue() == null) {
         submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
@@ -265,8 +248,8 @@ public class RMAppManager implements Eve
           new RMAppImpl(applicationId, rmContext, this.conf,
             submissionContext.getApplicationName(),
             submissionContext.getUser(), submissionContext.getQueue(),
-            submissionContext, clientTokenStr, this.scheduler,
-            this.masterService, submitTime);
+            submissionContext, this.scheduler, this.masterService,
+            submitTime);
 
       // Sanity check - duplicate?
       if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1437623&r1=1437622&r2=1437623&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Wed Jan 23 18:51:24 2013
@@ -237,7 +237,7 @@ public class AMLauncher implements Runna
 
       SecretKey clientSecretKey =
           this.rmContext.getClientToAMTokenSecretManager().getMasterKey(
-            applicationId);
+            application.getAppAttemptId());
       String encoded =
           Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
       environment.put(