You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/01/10 16:27:08 UTC

[1/2] git commit: working prototype

Updated Branches:
  refs/heads/helix-provisioning 27f627265 -> f282a3003


working prototype


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9a1ba91c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9a1ba91c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9a1ba91c

Branch: refs/heads/helix-provisioning
Commit: 9a1ba91c0c1e19939360c7916d41b7cc9346407d
Parents: 180aafe
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Jan 10 07:05:38 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Jan 10 07:05:38 2014 -0800

----------------------------------------------------------------------
 helix-provisioning/pom.xml                      |   2 +-
 .../apache/helix/provisioning/yarn/Client.java  | 421 ++++++++-----------
 .../yarn/GenericApplicationMaster.java          |  33 +-
 .../yarn/HelixYarnApplicationMasterMain.java    |  73 +++-
 .../provisioning/yarn/NMCallbackHandler.java    |  20 +-
 .../provisioning/yarn/RMCallbackHandler.java    |  43 +-
 .../provisioning/yarn/YarnProvisioner.java      | 227 +++++++++-
 7 files changed, 502 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9a1ba91c/helix-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml
index 1f5a585..d83bbf2 100644
--- a/helix-provisioning/pom.xml
+++ b/helix-provisioning/pom.xml
@@ -26,7 +26,7 @@ under the License.
   </parent>
   <artifactId>helix-provisioning</artifactId>
   <packaging>bundle</packaging>
-  <name>Apache Helix :: HelixAgent</name>
+  <name>Apache Helix :: HelixProvisioning</name>
 
   <properties>
     <hadoop.version>2.2.0</hadoop.version>

http://git-wip-us.apache.org/repos/asf/helix/blob/9a1ba91c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
index b535bd9..3caf8f0 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
@@ -1,6 +1,5 @@
 package org.apache.helix.provisioning.yarn;
 
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,7 +19,9 @@ package org.apache.helix.provisioning.yarn;
  */
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,6 +34,9 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -76,34 +80,40 @@ import org.apache.hadoop.yarn.util.Records;
 
 /**
  * Client for Distributed Shell application submission to YARN.
- * 
- * <p> The distributed shell client allows an application master to be launched that in turn would run 
- * the provided shell command on a set of containers. </p>
- * 
- * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
- * 
- * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
- * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 
- * provides a way for the client to get access to cluster information and to request for a
- * new {@link ApplicationId}. <p>
- * 
- * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
- * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
- * and application name, the priority assigned to the application and the queue
- * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
- * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
- * the {@link ApplicationMaster} is launched. </p>
- * 
- * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
- * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
- * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
- * {@link ApplicationMaster}. <p>
- * 
- * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
- * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
- * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
- * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
- *
+ * <p>
+ * The distributed shell client allows an application master to be launched that in turn would run
+ * the provided shell command on a set of containers.
+ * </p>
+ * <p>
+ * This client is meant to act as an example on how to write yarn-based applications.
+ * </p>
+ * <p>
+ * To submit an application, a client first needs to connect to the <code>ResourceManager</code> aka
+ * ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The
+ * {@link ApplicationClientProtocol} provides a way for the client to get access to cluster
+ * information and to request for a new {@link ApplicationId}.
+ * <p>
+ * <p>
+ * For the actual job submission, the client first has to create an
+ * {@link ApplicationSubmissionContext}. The {@link ApplicationSubmissionContext} defines the
+ * application details such as {@link ApplicationId} and application name, the priority assigned to
+ * the application and the queue to which this application needs to be assigned. In addition to
+ * this, the {@link ApplicationSubmissionContext} also defines the {@link ContainerLaunchContext}
+ * which describes the <code>Container</code> with which the {@link ApplicationMaster} is launched.
+ * </p>
+ * <p>
+ * The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
+ * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made
+ * available and the environment to be set for the {@link ApplicationMaster} and the commands to be
+ * executed to run the {@link ApplicationMaster}.
+ * <p>
+ * <p>
+ * Using the {@link ApplicationSubmissionContext}, the client submits the application to the
+ * <code>ResourceManager</code> and then monitors the application by requesting the
+ * <code>ResourceManager</code> for an {@link ApplicationReport} at regular time intervals. In case
+ * of the application taking too long, the client kills the application by submitting a
+ * {@link KillApplicationRequest} to the <code>ResourceManager</code>.
+ * </p>
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -121,32 +131,21 @@ public class Client {
   // Queue for App master
   private String amQueue = "";
   // Amt. of memory resource to request for to run the App Master
-  private int amMemory = 10; 
+  private int amMemory = 10;
 
   // Application master jar file
-  private String appMasterJar = ""; 
+  private String appMasterArchive = "";
   // Main class to invoke application master
   private final String appMasterMainClass;
 
-  // Shell command to be executed 
-  private String shellCommand = ""; 
-  // Location of shell script 
-  private String shellScriptPath = ""; 
-  // Args to be passed to the shell command
-  private String shellArgs = "";
-  // Env variables to be setup for the shell command 
-  private Map<String, String> shellEnv = new HashMap<String, String>();
-  // Shell Command Container priority 
-  private int shellCmdPriority = 0;
-
   // Amt of memory to request for container in which shell script will be executed
-  private int containerMemory = 10; 
+  private int containerMemory = 10;
   // No. of containers in which the shell script needs to be executed
   private int numContainers = 1;
 
-  // log4j.properties file 
-  // if available, add to local resources and set into classpath 
-  private String log4jPropFile = "";  
+  // log4j.properties file
+  // if available, add to local resources and set into classpath
+  private String log4jPropFile = "";
 
   // Start time for client
   private final long clientStartTime = System.currentTimeMillis();
@@ -154,13 +153,13 @@ public class Client {
   private long clientTimeout = 600000;
 
   // Debug flag
-  boolean debugFlag = false;  
+  boolean debugFlag = false;
 
   // Command line options
   private Options opts;
 
   /**
-   * @param args Command line arguments 
+   * @param args Command line arguments
    */
   public static void main(String[] args) {
     boolean result = false;
@@ -184,18 +183,16 @@ public class Client {
     }
     if (result) {
       LOG.info("Application completed successfully");
-      System.exit(0);     
-    } 
+      System.exit(0);
+    }
     LOG.error("Application failed to complete successfully");
     System.exit(2);
   }
 
   /**
    */
-  public Client(Configuration conf) throws Exception  {
-    this(
-      "org.apache.helix.provisioning.yarn.ApplicationMaster",
-      conf);
+  public Client(Configuration conf) throws Exception {
+    this("org.apache.helix.provisioning.yarn.HelixYarnApplicationMasterMain", conf);
   }
 
   Client(String appMasterMainClass, Configuration conf) {
@@ -208,15 +205,19 @@ public class Client {
     opts.addOption("priority", true, "Application Priority. Default 0");
     opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
     opts.addOption("timeout", true, "Application timeout in milliseconds");
-    opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
-    opts.addOption("jar", true, "Jar file containing the application master");
+    opts.addOption("master_memory", true,
+        "Amount of memory in MB to be requested to run the application master");
+    opts.addOption("archive", true, "Jar file containing the application master");
     opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
     opts.addOption("shell_script", true, "Location of the shell script to be executed");
     opts.addOption("shell_args", true, "Command line args for the shell script");
-    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
+    opts.addOption("shell_env", true,
+        "Environment for shell script. Specified as env_key=env_val pairs");
     opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
-    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
-    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
+    opts.addOption("container_memory", true,
+        "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("num_containers", true,
+        "No. of containers on which the shell command needs to be executed");
     opts.addOption("log_properties", true, "log4j.properties file");
     opts.addOption("debug", false, "Dump out debug information");
     opts.addOption("help", false, "Print usage");
@@ -225,7 +226,7 @@ public class Client {
 
   /**
    */
-  public Client() throws Exception  {
+  public Client() throws Exception {
     this(new YarnConfiguration());
   }
 
@@ -238,7 +239,7 @@ public class Client {
 
   /**
    * Parse command line options
-   * @param args Parsed command line options 
+   * @param args Parsed command line options
    * @return Whether the init was successful to run the client
    * @throws ParseException
    */
@@ -248,7 +249,7 @@ public class Client {
 
     if (args.length == 0) {
       throw new IllegalArgumentException("No args specified for client to initialize");
-    }   
+    }
 
     if (cliParser.hasOption("help")) {
       printUsage();
@@ -263,56 +264,27 @@ public class Client {
     appName = cliParser.getOptionValue("appname", "DistributedShell");
     amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
     amQueue = cliParser.getOptionValue("queue", "default");
-    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));   
+    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
 
     if (amMemory < 0) {
-      throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
-          + " Specified memory=" + amMemory);
+      throw new IllegalArgumentException(
+          "Invalid memory specified for application master, exiting." + " Specified memory="
+              + amMemory);
     }
 
-    if (!cliParser.hasOption("jar")) {
-      throw new IllegalArgumentException("No jar file specified for application master");
-    }   
-
-    appMasterJar = cliParser.getOptionValue("jar");
-
-    if (!cliParser.hasOption("shell_command")) {
-      throw new IllegalArgumentException("No shell command specified to be executed by application master");
+    if (!cliParser.hasOption("archive")) {
+      throw new IllegalArgumentException("No archive file specified for application master");
     }
-    shellCommand = cliParser.getOptionValue("shell_command");
 
-    if (cliParser.hasOption("shell_script")) {
-      shellScriptPath = cliParser.getOptionValue("shell_script");
-    }
-    if (cliParser.hasOption("shell_args")) {
-      shellArgs = cliParser.getOptionValue("shell_args");
-    }
-    if (cliParser.hasOption("shell_env")) { 
-      String envs[] = cliParser.getOptionValues("shell_env");
-      for (String env : envs) {
-        env = env.trim();
-        int index = env.indexOf('=');
-        if (index == -1) {
-          shellEnv.put(env, "");
-          continue;
-        }
-        String key = env.substring(0, index);
-        String val = "";
-        if (index < (env.length()-1)) {
-          val = env.substring(index+1);
-        }
-        shellEnv.put(key, val);
-      }
-    }
-    shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
+    appMasterArchive = cliParser.getOptionValue("archive");
 
     containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
     numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
 
     if (containerMemory < 0 || numContainers < 1) {
-      throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
-          + " Specified containerMemory=" + containerMemory
-          + ", numContainer=" + numContainers);
+      throw new IllegalArgumentException(
+          "Invalid no. of containers or container memory specified, exiting."
+              + " Specified containerMemory=" + containerMemory + ", numContainer=" + numContainers);
     }
 
     clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
@@ -334,55 +306,48 @@ public class Client {
     yarnClient.start();
 
     YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
-    LOG.info("Got Cluster metric info from ASM" 
-        + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
+    LOG.info("Got Cluster metric info from ASM" + ", numNodeManagers="
+        + clusterMetrics.getNumNodeManagers());
 
-    List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
-        NodeState.RUNNING);
+    List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
     LOG.info("Got Cluster node info from ASM");
     for (NodeReport node : clusterNodeReports) {
-      LOG.info("Got node report from ASM for"
-          + ", nodeId=" + node.getNodeId() 
-          + ", nodeAddress" + node.getHttpAddress()
-          + ", nodeRackName" + node.getRackName()
-          + ", nodeNumContainers" + node.getNumContainers());
+      LOG.info("Got node report from ASM for" + ", nodeId=" + node.getNodeId() + ", nodeAddress"
+          + node.getHttpAddress() + ", nodeRackName" + node.getRackName() + ", nodeNumContainers"
+          + node.getNumContainers());
     }
 
     QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
-    LOG.info("Queue info"
-        + ", queueName=" + queueInfo.getQueueName()
-        + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
-        + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+    LOG.info("Queue info" + ", queueName=" + queueInfo.getQueueName() + ", queueCurrentCapacity="
+        + queueInfo.getCurrentCapacity() + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
         + ", queueApplicationCount=" + queueInfo.getApplications().size()
-        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());   
+        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
 
     List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
     for (QueueUserACLInfo aclInfo : listAclInfo) {
       for (QueueACL userAcl : aclInfo.getUserAcls()) {
-        LOG.info("User ACL Info for Queue"
-            + ", queueName=" + aclInfo.getQueueName()     
-            + ", userAcl=" + userAcl.name());
+        LOG.info("User ACL Info for Queue" + ", queueName=" + aclInfo.getQueueName() + ", userAcl="
+            + userAcl.name());
       }
-    }   
+    }
 
     // Get a new application id
     YarnClientApplication app = yarnClient.createApplication();
     GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
     // TODO get min/max resource capabilities from RM and change memory ask if needed
-    // If we do not have min/max, we may not be able to correctly request 
+    // If we do not have min/max, we may not be able to correctly request
     // the required resources from the RM for the app master
-    // Memory ask has to be a multiple of min and less than max. 
+    // Memory ask has to be a multiple of min and less than max.
     // Dump out information about cluster capability as seen by the resource manager
     int maxMem = appResponse.getMaximumResourceCapability().getMemory();
     LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
 
-    // A resource ask cannot exceed the max. 
+    // A resource ask cannot exceed the max.
     if (amMemory > maxMem) {
       LOG.info("AM memory specified above max threshold of cluster. Using max value."
-          + ", specified=" + amMemory
-          + ", max=" + maxMem);
+          + ", specified=" + amMemory + ", max=" + maxMem);
       amMemory = maxMem;
-    }       
+    }
 
     // set the application name
     ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
@@ -394,15 +359,15 @@ public class Client {
 
     // set local resources for the application master
     // local files or archives as needed
-    // In this scenario, the jar file for the application master is part of the local resources     
+    // In this scenario, the jar file for the application master is part of the local resources
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
 
-    LOG.info("Copy App Master jar from local filesystem and add to local environment");
-    // Copy the application master jar to the filesystem 
-    // Create a local resource to point to the destination jar path 
+    LOG.info("Copy App archive file from local filesystem and add to local environment");
+    // Copy the application master jar to the filesystem
+    // Create a local resource to point to the destination jar path
     FileSystem fs = FileSystem.get(conf);
-    Path src = new Path(appMasterJar);
-    String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";     
+    Path src = new Path(appMasterArchive);
+    String pathSuffix = appName + "/" + appId.getId() + "/app-pkg.tar";
     Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
     fs.copyFromLocalFile(false, true, src, dst);
     FileStatus destStatus = fs.getFileStatus(dst);
@@ -411,21 +376,21 @@ public class Client {
     // Set the type of resource - file or archive
     // archives are untarred at destination
     // we don't need the jar file to be untarred for now
-    amJarRsrc.setType(LocalResourceType.FILE);
-    // Set visibility of the resource 
+    amJarRsrc.setType(LocalResourceType.ARCHIVE);
+    // Set visibility of the resource
     // Setting to most private option
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);    
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
     // Set the resource to be copied over
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); 
-    // Set timestamp and length of file so that the framework 
-    // can do basic sanity checks for the local resource 
-    // after it has been copied over to ensure it is the same 
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+    // Set timestamp and length of file so that the framework
+    // can do basic sanity checks for the local resource
+    // after it has been copied over to ensure it is the same
     // resource the client intended to use with the application
     amJarRsrc.setTimestamp(destStatus.getModificationTime());
     amJarRsrc.setSize(destStatus.getLen());
-    localResources.put("AppMaster.jar",  amJarRsrc);
+    localResources.put("app-pkg", amJarRsrc);
 
-    // Set the log4j properties if needed 
+    // Set the log4j properties if needed
     if (!log4jPropFile.isEmpty()) {
       Path log4jSrc = new Path(log4jPropFile);
       Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
@@ -433,60 +398,48 @@ public class Client {
       FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
       LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
       log4jRsrc.setType(LocalResourceType.FILE);
-      log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);    
+      log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
       log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
       log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
       log4jRsrc.setSize(log4jFileStatus.getLen());
       localResources.put("log4j.properties", log4jRsrc);
-    }     
-
-    // The shell script has to be made available on the final container(s)
-    // where it will be executed. 
-    // To do this, we need to first copy into the filesystem that is visible 
-    // to the yarn framework. 
-    // We do not need to set this as a local resource for the application 
-    // master as the application master does not need it.     
-    String hdfsShellScriptLocation = ""; 
-    long hdfsShellScriptLen = 0;
-    long hdfsShellScriptTimestamp = 0;
-    if (!shellScriptPath.isEmpty()) {
-      Path shellSrc = new Path(shellScriptPath);
-      String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
-      Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
-      fs.copyFromLocalFile(false, true, shellSrc, shellDst);
-      hdfsShellScriptLocation = shellDst.toUri().toString(); 
-      FileStatus shellFileStatus = fs.getFileStatus(shellDst);
-      hdfsShellScriptLen = shellFileStatus.getLen();
-      hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
     }
 
     // Set local resource info into app master container launch context
     amContainer.setLocalResources(localResources);
 
     // Set the necessary security tokens as needed
-    //amContainer.setContainerTokens(containerToken);
+    // amContainer.setContainerTokens(containerToken);
 
-    // Set the env variables to be setup in the env where the application master will be run
-    LOG.info("Set the environment for the application master");
-    Map<String, String> env = new HashMap<String, String>();
 
-    // put location of shell script into env
-    // using the env info, the application master will create the correct local resource for the 
-    // eventual containers that will be launched to execute the shell scripts
-    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
-    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
-    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
-
-    // Add AppMaster.jar location to classpath    
-    // At some point we should not be required to add 
-    // the hadoop specific classpaths to the env. 
-    // It should be provided out of the box. 
+    // Add AppMaster.jar location to classpath
+    // At some point we should not be required to add
+    // the hadoop specific classpaths to the env.
+    // It should be provided out of the box.
     // For now setting all required classpaths including
     // the classpath to "." for the application jar
-    StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
-      .append(File.pathSeparatorChar).append("./*");
-    for (String c : conf.getStrings(
-        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+    StringBuilder classPathEnv =
+        new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*");
+    StringBuilder appClassPathEnv = new StringBuilder();
+    // put the jar files under the archive in the classpath
+    try {
+      final InputStream is = new FileInputStream(appMasterArchive);
+      final TarArchiveInputStream debInputStream =
+          (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is);
+      TarArchiveEntry entry = null;
+      while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
+        if (entry.isFile()) {
+          appClassPathEnv.append(File.pathSeparatorChar);
+          appClassPathEnv.append("./app-pkg/" + entry.getName());
+        }
+      }
+      debInputStream.close();
+
+    } catch (Exception e) {
+      LOG.error("Unable to read archive file:" + appMasterArchive, e);
+    }
+    classPathEnv.append(appClassPathEnv);
+    for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
         YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
       classPathEnv.append(File.pathSeparatorChar);
       classPathEnv.append(c.trim());
@@ -498,34 +451,32 @@ public class Client {
       classPathEnv.append(':');
       classPathEnv.append(System.getProperty("java.class.path"));
     }
-
+    System.out.println("classpath" + classPathEnv.toString());
+    // Set the env variables to be setup in the env where the application master will be run
+    LOG.info("Set the environment for the application master");
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("app_pkg_path", fs.getHomeDirectory() + "/" + appName + "/" + appId.getId() + "/app-pkg.tar");
+    env.put("appName", appName);
+    env.put("appId", "" + appId.getId());
     env.put("CLASSPATH", classPathEnv.toString());
-
+    env.put("appClasspath", appClassPathEnv.toString());
+    env.put("containerParticipantMainClass", "org.apache.helix.provisioning.yarn.ContainerParticipant");
     amContainer.setEnvironment(env);
 
-    // Set the necessary command to execute the application master 
+    // Set the necessary command to execute the application master
     Vector<CharSequence> vargs = new Vector<CharSequence>(30);
 
-    // Set java executable command 
+    // Set java executable command
     LOG.info("Setting up app master command");
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
     // Set Xmx based on am memory size
     vargs.add("-Xmx" + amMemory + "m");
-    // Set class name 
+    // Set class name
     vargs.add(appMasterMainClass);
     // Set params for Application Master
     vargs.add("--container_memory " + String.valueOf(containerMemory));
     vargs.add("--num_containers " + String.valueOf(numContainers));
-    vargs.add("--priority " + String.valueOf(shellCmdPriority));
-    if (!shellCommand.isEmpty()) {
-      vargs.add("--shell_command " + shellCommand + "");
-    }
-    if (!shellArgs.isEmpty()) {
-      vargs.add("--shell_args " + shellArgs + "");
-    }
-    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
-      vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
-    }     
+
     if (debugFlag) {
       vargs.add("--debug");
     }
@@ -539,9 +490,9 @@ public class Client {
       command.append(str).append(" ");
     }
 
-    LOG.info("Completed setting up app master command " + command.toString());     
+    LOG.info("Completed setting up app master command " + command.toString());
     List<String> commands = new ArrayList<String>();
-    commands.add(command.toString());   
+    commands.add(command.toString());
     amContainer.setCommands(commands);
 
     // Set up resource type requirements
@@ -559,13 +510,11 @@ public class Client {
       Credentials credentials = new Credentials();
       String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
       if (tokenRenewer == null || tokenRenewer.length() == 0) {
-        throw new IOException(
-          "Can't get Master Kerberos principal for the RM to use as renewer");
+        throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
       }
 
       // For now, only getting tokens for the default file-system.
-      final Token<?> tokens[] =
-          fs.addDelegationTokens(tokenRenewer, credentials);
+      final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials);
       if (tokens != null) {
         for (Token<?> token : tokens) {
           LOG.info("Got dt for " + fs.getUri() + "; " + token);
@@ -581,7 +530,7 @@ public class Client {
 
     // Set the priority for the application master
     Priority pri = Records.newRecord(Priority.class);
-    // TODO - what is the range for priority? how to decide? 
+    // TODO - what is the range for priority? how to decide?
     pri.setPriority(amPriority);
     appContext.setPriority(pri);
 
@@ -590,7 +539,7 @@ public class Client {
 
     // Submit the application to the applications manager
     // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
-    // Ignore the response as either a valid response object is returned on success 
+    // Ignore the response as either a valid response object is returned on success
     // or an exception thrown to denote some form of a failure
     LOG.info("Submitting application to ASM");
 
@@ -606,15 +555,14 @@ public class Client {
   }
 
   /**
-   * Monitor the submitted application for completion. 
-   * Kill application if time expires. 
+   * Monitor the submitted application for completion.
+   * Kill application if time expires.
    * @param appId Application Id of application to be monitored
    * @return true if application completed successfully
    * @throws YarnException
    * @throws IOException
    */
-  private boolean monitorApplication(ApplicationId appId)
-      throws YarnException, IOException {
+  private boolean monitorApplication(ApplicationId appId) throws YarnException, IOException {
 
     while (true) {
 
@@ -625,68 +573,57 @@ public class Client {
         LOG.debug("Thread sleep in monitoring loop interrupted");
       }
 
-      // Get application report for the appId we are interested in 
+      // Get application report for the appId we are interested in
       ApplicationReport report = yarnClient.getApplicationReport(appId);
 
-      LOG.info("Got application report from ASM for"
-          + ", appId=" + appId.getId()
-          + ", clientToAMToken=" + report.getClientToAMToken()
-          + ", appDiagnostics=" + report.getDiagnostics()
-          + ", appMasterHost=" + report.getHost()
-          + ", appQueue=" + report.getQueue()
-          + ", appMasterRpcPort=" + report.getRpcPort()
-          + ", appStartTime=" + report.getStartTime()
-          + ", yarnAppState=" + report.getYarnApplicationState().toString()
+      LOG.info("Got application report from ASM for" + ", appId=" + appId.getId()
+          + ", clientToAMToken=" + report.getClientToAMToken() + ", appDiagnostics="
+          + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + ", appQueue="
+          + report.getQueue() + ", appMasterRpcPort=" + report.getRpcPort() + ", appStartTime="
+          + report.getStartTime() + ", yarnAppState=" + report.getYarnApplicationState().toString()
           + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
-          + ", appTrackingUrl=" + report.getTrackingUrl()
-          + ", appUser=" + report.getUser());
+          + ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser());
 
       YarnApplicationState state = report.getYarnApplicationState();
       FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
       if (YarnApplicationState.FINISHED == state) {
         if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
           LOG.info("Application has completed successfully. Breaking monitoring loop");
-          return true;        
-        }
-        else {
-          LOG.info("Application did finished unsuccessfully."
-              + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
-              + ". Breaking monitoring loop");
+          return true;
+        } else {
+          LOG.info("Application did finished unsuccessfully." + " YarnState=" + state.toString()
+              + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
           return false;
-        }       
-      }
-      else if (YarnApplicationState.KILLED == state 
-          || YarnApplicationState.FAILED == state) {
-        LOG.info("Application did not finish."
-            + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
-            + ". Breaking monitoring loop");
+        }
+      } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
+        LOG.info("Application did not finish." + " YarnState=" + state.toString()
+            + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
         return false;
-      }     
+      }
 
       if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
         LOG.info("Reached client specified timeout for application. Killing application");
         forceKillApplication(appId);
-        return false;       
+        return false;
       }
-    }     
+    }
 
   }
 
   /**
    * Kill a submitted application by sending a call to the ASM
-   * @param appId Application Id to be killed. 
+   * @param appId Application Id to be killed.
    * @throws YarnException
    * @throws IOException
    */
-  private void forceKillApplication(ApplicationId appId)
-      throws YarnException, IOException {
-    // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
-    // the same time. 
+  private void forceKillApplication(ApplicationId appId) throws YarnException, IOException {
+    // TODO clarify whether multiple jobs with the same app id can be submitted and be running at
+    // the same time.
     // If yes, can we kill a particular attempt only?
 
-    // Response can be ignored as it is non-null on success or 
+    // Response can be ignored as it is non-null on success or
     // throws an exception in case of failures
-    yarnClient.killApplication(appId);  
+    yarnClient.killApplication(appId);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9a1ba91c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index d3f410f..1b27378 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@ -98,25 +98,23 @@ public class GenericApplicationMaster {
   // Tracking url to which app master publishes info for clients to monitor
   private String appMasterTrackingUrl = "";
 
-  // Counter for completed containers ( complete denotes successful or failed )
-  AtomicInteger numCompletedContainers = new AtomicInteger();
-  // Allocated container count so that we know how many containers has the RM
-  // allocated to us
-  AtomicInteger numAllocatedContainers = new AtomicInteger();
-  // Count of failed containers
-  AtomicInteger numFailedContainers = new AtomicInteger();
-  // Count of containers already requested from the RM
-  // Needed as once requested, we should not request for containers again.
-  // Only request for more if the original requirement changes.
-  AtomicInteger numRequestedContainers = new AtomicInteger();
+
   Map<ContainerRequest, SettableFuture<ContainerAskResponse>> containerRequestMap =
       new LinkedHashMap<AMRMClient.ContainerRequest, SettableFuture<ContainerAskResponse>>();
+  Map<ContainerId, SettableFuture<ContainerReleaseResponse>> containerReleaseMap =
+      new LinkedHashMap<ContainerId, SettableFuture<ContainerReleaseResponse>>();
+  Map<ContainerId, SettableFuture<ContainerStopResponse>> containerStopMap =
+      new LinkedHashMap<ContainerId, SettableFuture<ContainerStopResponse>>();
+  Map<ContainerId, SettableFuture<ContainerLaunchResponse>> containerLaunchResponseMap =
+      new LinkedHashMap<ContainerId, SettableFuture<ContainerLaunchResponse>>();
 
+  
   ByteBuffer allTokens;
 
   // Launch threads
   List<Thread> launchThreads = new ArrayList<Thread>();
 
+
   public GenericApplicationMaster(ApplicationAttemptId appAttemptID) {
     this.appAttemptID = appAttemptID;
     // Set up the configuration
@@ -238,28 +236,31 @@ public class GenericApplicationMaster {
 
 
   public Future<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
-    amRMClient.addContainerRequest(containerAsk);
-    numRequestedContainers.incrementAndGet();
     SettableFuture<ContainerAskResponse> future = SettableFuture.create();
+    containerRequestMap.put(containerAsk, future);
+    amRMClient.addContainerRequest(containerAsk);
     return future;
   }
 
   public Future<ContainerStopResponse> stopContainer(Container container) {
-    nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
     SettableFuture<ContainerStopResponse> future = SettableFuture.create();
+    containerStopMap.put(container.getId(), future);
+    nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
     return future;
   }
 
   public Future<ContainerReleaseResponse> releaseContainer(Container container) {
-    amRMClient.releaseAssignedContainer(container.getId());
     SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
+    containerReleaseMap.put(container.getId(), future);
+    amRMClient.releaseAssignedContainer(container.getId());
     return future;
   }
 
   public Future<ContainerLaunchResponse> launchContainer(Container container,
       ContainerLaunchContext containerLaunchContext) {
-    nmClientAsync.startContainerAsync(container, containerLaunchContext);
     SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
+    containerLaunchResponseMap.put(container.getId(), future);
+    nmClientAsync.startContainerAsync(container, containerLaunchContext);
     return future;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9a1ba91c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
index 73c0e90..92930ed 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
@@ -1,17 +1,27 @@
 package org.apache.helix.provisioning.yarn;
 
-import java.io.IOException;
 import java.util.Map;
 
 import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
-import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.helix.HelixController;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
 
 /**
  * This will <br/>
@@ -34,28 +44,63 @@ public class HelixYarnApplicationMasterMain {
 
       }
     };
-    ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
+    final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
     server.start();
 
-   
-
     // start
-
     Map<String, String> envs = System.getenv();
 
     ContainerId containerId =
         ConverterUtils.toContainerId(envs.get(Environment.CONTAINER_ID.name()));
     ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
-    
-    //GenericApplicationMaster genAppMaster = new GenericApplicationMaster(appAttemptID);
-    
+
+    // GenericApplicationMaster genAppMaster = new GenericApplicationMaster(appAttemptID);
+
     GenericApplicationMaster genericApplicationMaster = new GenericApplicationMaster(appAttemptID);
     genericApplicationMaster.start();
-    
+
     YarnProvisioner.applicationMaster = genericApplicationMaster;
-    
+
+    String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181";
+    String clusterName = "testCluster";
+    String resourceName = "testResource";
+    int NUM_PARTITIONS = 6;
+    int NUM_REPLICAS = 2;
     // CREATE CLUSTER and setup the resources
-    
-    
+    // connect
+    ZkHelixConnection connection = new ZkHelixConnection(zkAddress);
+    connection.connect();
+
+    // create the cluster
+    ClusterId clusterId = ClusterId.from(clusterName);
+    ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+    StateModelDefinition masterSlave =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
+        masterSlave).build());
+
+    // add the resource with the local provisioner
+    ResourceId resourceId = ResourceId.from(resourceName);
+    ProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+    RebalancerConfig rebalancerConfig =
+        new FullAutoRebalancerConfig.Builder(resourceId).addPartitions(NUM_PARTITIONS)
+            .replicaCount(NUM_REPLICAS).stateModelDefId(masterSlave.getStateModelDefId()).build();
+    clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(ResourceId.from(resourceName))
+        .provisionerConfig(provisionerConfig).rebalancerConfig(rebalancerConfig).build());
+
+    // start controller
+    ControllerId controllerId = ControllerId.from("controller1");
+    HelixController controller = connection.createController(clusterId, controllerId);
+    controller.startAsync(); // TODO: is this really async?
+
+    Thread shutdownhook = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        server.shutdown();
+      }
+    });
+    Runtime.getRuntime().addShutdownHook(shutdownhook);
+    Thread.sleep(10000);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9a1ba91c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
index df73946..3735e7a 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
@@ -8,11 +8,14 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.SettableFuture;
 
-@VisibleForTesting class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+@VisibleForTesting
+class NMCallbackHandler implements NMClientAsync.CallbackHandler {
 
   private ConcurrentMap<ContainerId, Container> containers =
       new ConcurrentHashMap<ContainerId, Container>();
@@ -37,28 +40,31 @@ import com.google.common.annotations.VisibleForTesting;
   @Override
   public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
     if (GenericApplicationMaster.LOG.isDebugEnabled()) {
-      GenericApplicationMaster.LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus);
+      GenericApplicationMaster.LOG.debug("Container Status: id=" + containerId + ", status="
+          + containerStatus);
     }
   }
 
   @Override
-  public void onContainerStarted(ContainerId containerId,
-      Map<String, ByteBuffer> allServiceResponse) {
+  public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
     if (GenericApplicationMaster.LOG.isDebugEnabled()) {
       GenericApplicationMaster.LOG.debug("Succeeded to start Container " + containerId);
     }
+
     Container container = containers.get(containerId);
     if (container != null) {
       applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
     }
+    SettableFuture<ContainerLaunchResponse> settableFuture =
+        applicationMaster.containerLaunchResponseMap.get(containerId);
+    ContainerLaunchResponse value = new ContainerLaunchResponse();
+    settableFuture.set(value);
   }
 
   @Override
   public void onStartContainerError(ContainerId containerId, Throwable t) {
     GenericApplicationMaster.LOG.error("Failed to start Container " + containerId);
     containers.remove(containerId);
-    applicationMaster.numCompletedContainers.incrementAndGet();
-    applicationMaster.numFailedContainers.incrementAndGet();
   }
 
   @Override
@@ -71,4 +77,4 @@ import com.google.common.annotations.VisibleForTesting;
     GenericApplicationMaster.LOG.error("Failed to stop Container " + containerId);
     containers.remove(containerId);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9a1ba91c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index 659411c..6c87bd2 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -2,6 +2,8 @@ package org.apache.helix.provisioning.yarn;
 
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -10,7 +12,11 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 
+import com.google.common.util.concurrent.SettableFuture;
+
 class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+  private static final Log LOG = LogFactory.getLog(RMCallbackHandler.class);
+
   /**
    * 
    */
@@ -26,7 +32,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
   @SuppressWarnings("unchecked")
   @Override
   public void onContainersCompleted(List<ContainerStatus> completedContainers) {
-    GenericApplicationMaster.LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+    LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
     for (ContainerStatus containerStatus : completedContainers) {
       GenericApplicationMaster.LOG.info("Got container status for containerID=" + containerStatus.getContainerId()
           + ", state=" + containerStatus.getState() + ", exitStatus="
@@ -34,28 +40,23 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
 
       // non complete containers should not be here
       assert (containerStatus.getState() == ContainerState.COMPLETE);
-
+     
       // increment counters for completed/failed containers
       int exitStatus = containerStatus.getExitStatus();
       if (0 != exitStatus) {
         // container failed
         if (ContainerExitStatus.ABORTED != exitStatus) {
-          // shell script failed
-          // counts as completed
-          _genericApplicationMaster.numCompletedContainers.incrementAndGet();
-          _genericApplicationMaster.numFailedContainers.incrementAndGet();
+      
         } else {
           // container was killed by framework, possibly preempted
           // we should re-try as the container was lost for some reason
-          _genericApplicationMaster.numAllocatedContainers.decrementAndGet();
-          _genericApplicationMaster.numRequestedContainers.decrementAndGet();
+      
           // we do not need to release the container as it would be done
           // by the RM
         }
       } else {
         // nothing to do
         // container completed successfully
-        _genericApplicationMaster.numCompletedContainers.incrementAndGet();
         GenericApplicationMaster.LOG.info("Container completed successfully." + ", containerId="
             + containerStatus.getContainerId());
       }
@@ -65,26 +66,22 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
   @Override
   public void onContainersAllocated(List<Container> allocatedContainers) {
     GenericApplicationMaster.LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
-    _genericApplicationMaster.numAllocatedContainers.addAndGet(allocatedContainers.size());
     for (Container allocatedContainer : allocatedContainers) {
-      GenericApplicationMaster.LOG.info("Launching shell command on a new container." + ", containerId="
+      GenericApplicationMaster.LOG.info("Allocated new container." + ", containerId="
           + allocatedContainer.getId() + ", containerNode="
           + allocatedContainer.getNodeId().getHost() + ":"
           + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
           + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
           + allocatedContainer.getResource().getMemory());
-      // + ", containerToken"
-      // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
-      LaunchContainerRunnable runnableLaunchContainer =
-          new LaunchContainerRunnable(_genericApplicationMaster, allocatedContainer, _genericApplicationMaster.containerListener);
-      Thread launchThread = new Thread(runnableLaunchContainer);
-
-      // launch and start the container on a separate thread to keep
-      // the main thread unblocked
-      // as all containers may not be allocated at one go.
-      _genericApplicationMaster.launchThreads.add(launchThread);
-      launchThread.start();
+      for(ContainerRequest containerRequest: _genericApplicationMaster.containerRequestMap.keySet()){
+        if(containerRequest.getCapability().getMemory() == allocatedContainer.getResource().getMemory()){
+          SettableFuture<ContainerAskResponse> future = _genericApplicationMaster.containerRequestMap.remove(containerRequest);
+          ContainerAskResponse response = new ContainerAskResponse();
+          response.setContainer(allocatedContainer);
+          future.set(response);
+          break;
+        }
+      }     
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9a1ba91c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index bfaa209..f983255 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -1,22 +1,48 @@
 package org.apache.helix.provisioning.yarn;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
+import org.apache.helix.api.config.ContainerConfig;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.provisioner.ContainerId;
 import org.apache.helix.controller.provisioner.ContainerSpec;
@@ -24,11 +50,14 @@ import org.apache.helix.controller.provisioner.ContainerState;
 import org.apache.helix.controller.provisioner.Provisioner;
 import org.apache.helix.controller.provisioner.TargetProviderResponse;
 
+import com.google.common.collect.Lists;
+
 public class YarnProvisioner implements Provisioner {
 
   private static final Log LOG = LogFactory.getLog(YarnProvisioner.class);
   static GenericApplicationMaster applicationMaster;
   Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
+  int DEFAULT_CONTAINER = 4;
 
   @Override
   public ContainerId allocateContainer(ContainerSpec spec) {
@@ -42,10 +71,8 @@ public class YarnProvisioner implements Provisioner {
           ContainerId.from(containerAskResponse.getContainer().getId().toString());
       allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
       return helixContainerId;
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    } catch (ExecutionException e) {
-      e.printStackTrace();
+    } catch (Exception e) {
+      LOG.error("Exception in allocateContainer for spec:" + spec, e);
     }
     return null;
   }
@@ -57,12 +84,8 @@ public class YarnProvisioner implements Provisioner {
     try {
       releaseContainer.get();
       return true;
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (ExecutionException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+    } catch (Exception e) {
+      LOG.error("Exception in deallocateContainer containerId:" + containerId, e);
     }
     return false;
   }
@@ -70,13 +93,134 @@ public class YarnProvisioner implements Provisioner {
   @Override
   public boolean startContainer(ContainerId containerId) {
     Container container = allocatedContainersMap.get(containerId);
-    ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
-    applicationMaster.launchContainer(container, containerLaunchContext);
+    try {
+      Future<ContainerLaunchResponse> launchContainer =
+          applicationMaster.launchContainer(container, createLaunchContext(containerId));
+      ContainerLaunchResponse containerLaunchResponse = launchContainer.get();
+      return true;
+    } catch (Exception e) {
+      LOG.error("Exception while starting container containerId:" + containerId, e);
+    }
     return false;
   }
 
+  private ContainerLaunchContext createLaunchContext(ContainerId containerId) throws Exception {
+
+    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+    Map<String, String> envs = System.getenv();
+    String appName = envs.get("appName");
+    String appId = envs.get("appId");
+    String appClasspath = envs.get("appClasspath");
+    String containerParticipantMainClass = envs.get("containerParticipantMainClass");
+    String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181";
+
+    // set the localresources needed to launch container
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+    YarnConfiguration conf = new YarnConfiguration();
+    FileSystem fs;
+    fs = FileSystem.get(conf);
+    String pathSuffix = appName + "/" + appId + "/app-pkg.tar";
+    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+    FileStatus destStatus = fs.getFileStatus(dst);
+
+    // Set the type of resource - file or archive
+    // archives are untarred at destination
+    // we don't need the jar file to be untarred for now
+    amJarRsrc.setType(LocalResourceType.ARCHIVE);
+    // Set visibility of the resource
+    // Setting to most private option
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    // Set the resource to be copied over
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+    // Set timestamp and length of file so that the framework
+    // can do basic sanity checks for the local resource
+    // after it has been copied over to ensure it is the same
+    // resource the client intended to use with the application
+    amJarRsrc.setTimestamp(destStatus.getModificationTime());
+    amJarRsrc.setSize(destStatus.getLen());
+    localResources.put("app-pkg", amJarRsrc);
+
+    // Set local resource info into app master container launch context
+    amContainer.setLocalResources(localResources);
+
+    // Set the necessary security tokens as needed
+    // amContainer.setContainerTokens(containerToken);
+
+    // Set the env variables to be setup in the env where the application master will be run
+    LOG.info("Set the environment for the application master");
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("app-pkg-path", dst.getName());
+    // Add AppMaster.jar location to classpath
+    // At some point we should not be required to add
+    // the hadoop specific classpaths to the env.
+    // It should be provided out of the box.
+    // For now setting all required classpaths including
+    // the classpath to "." for the application jar
+    StringBuilder classPathEnv =
+        new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*");
+    classPathEnv.append(File.pathSeparatorChar);
+    classPathEnv.append(appClasspath);
+
+    for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(File.pathSeparatorChar);
+      classPathEnv.append(c.trim());
+    }
+    classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
+
+    // add the runtime classpath needed for tests to work
+    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      classPathEnv.append(':');
+      classPathEnv.append(System.getProperty("java.class.path"));
+    }
+    System.out.println("classoath" + classPathEnv.toString());
+    env.put("CLASSPATH", classPathEnv.toString());
+
+    amContainer.setEnvironment(env);
+
+    // Set the necessary command to execute the application master
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command
+    LOG.info("Setting up app master command");
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+    // Set Xmx based on am memory size
+    vargs.add("-Xmx" + 1024 + "m");
+    // Set class name
+    vargs.add(containerParticipantMainClass);
+    // Set params for container participant
+    vargs.add("--zk_address " + zkAddress);
+    vargs.add("--participantId " + containerId.stringify());
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up app master command " + command.toString());
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+    amContainer.setCommands(commands);
+    return amContainer;
+  }
+
   @Override
   public boolean stopContainer(ContainerId containerId) {
+    Container container = allocatedContainersMap.get(containerId);
+    Future<ContainerStopResponse> stopContainer = applicationMaster.stopContainer(container);
+    try {
+      ContainerStopResponse containerStopResponse = stopContainer.get();
+      return true;
+    } catch (Exception e) {
+      LOG.error("Exception while stopping container containerId:" + containerId, e);
+    }
     return false;
   }
 
@@ -93,8 +237,63 @@ public class YarnProvisioner implements Provisioner {
   @Override
   public TargetProviderResponse evaluateExistingContainers(Cluster cluster, ResourceId resourceId,
       Collection<Participant> participants) {
-    // TODO Auto-generated method stub
-    return null;
+    TargetProviderResponse response = new TargetProviderResponse();
+    // ask for two containers at a time
+    List<ContainerSpec> containersToAcquire = Lists.newArrayList();
+    List<Participant> containersToStart = Lists.newArrayList();
+    List<Participant> containersToRelease = Lists.newArrayList();
+    List<Participant> containersToStop = Lists.newArrayList();
+
+    for (int i = 0; i < DEFAULT_CONTAINER - participants.size(); i++) {
+      containersToAcquire.add(new ContainerSpec(ContainerId.from("container"
+          + (DEFAULT_CONTAINER - i))));
+    }
+    response.setContainersToAcquire(containersToAcquire);
+
+    for (Participant participant : participants) {
+      ContainerConfig containerConfig = participant.getContainerConfig();
+      if (containerConfig != null && containerConfig.getState() != null) {
+        ContainerState state = containerConfig.getState();
+        switch (state) {
+        case ACQUIRED:
+          // acquired containers are ready to start
+          containersToStart.add(participant);
+          break;
+        case ACTIVE:
+
+          break;
+        case HALTED:
+          // halted containers can be released
+          // containersToRelease.add(participant);
+          break;
+        case ACQUIRING:
+          break;
+        case CONNECTING:
+          break;
+        case FAILED:
+          break;
+        case FINALIZED:
+          break;
+        case FINALIZING:
+          break;
+        case TEARDOWN:
+          break;
+        default:
+          break;
+        }
+        ContainerId containerId = containerConfig.getId();
+        if (containerId != null) {
+          // _containerParticipants.put(containerId, participant.getId());
+          // _states.put(containerId, state);
+        }
+      }
+    }
+    response.setContainersToStart(containersToStart);
+    response.setContainersToRelease(containersToRelease);
+    response.setContainersToStop(containersToStop);
+    LOG.info("target provider response containers to acquire:" + response.getContainersToAcquire());
+    LOG.info("target provider response containers to start:" + response.getContainersToStart());
+    return response;
   }
 
   private ContainerRequest setupContainerAskForRM(ContainerSpec spec) {


[2/2] git commit: resolving conflicts

Posted by ki...@apache.org.
resolving conflicts


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f282a300
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f282a300
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f282a300

Branch: refs/heads/helix-provisioning
Commit: f282a30039223a2068350cac12596b55c55dedca
Parents: 9a1ba91 27f6272
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Jan 10 07:26:58 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Jan 10 07:26:58 2014 -0800

----------------------------------------------------------------------
 .../provisioner/ContainerProvider.java          |  10 +-
 .../controller/provisioner/ContainerState.java  |   4 +-
 .../stages/ContainerProvisioningStage.java      | 150 +++++++++++++------
 .../integration/TestLocalContainerProvider.java |  28 +++-
 .../provisioning/yarn/ContainerParticipant.java |  15 ++
 .../yarn/GenericApplicationMaster.java          |  27 ++--
 .../provisioning/yarn/YarnProvisioner.java      |  89 ++++++-----
 .../yarn/YarnProvisionerConfig.java             |  46 ++++++
 8 files changed, 254 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
index 0000000,0000000..b0272e8
new file mode 100644
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
@@@ -1,0 -1,0 +1,15 @@@
++package org.apache.helix.provisioning.yarn;
++
++import java.util.Arrays;
++
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++
++public class ContainerParticipant {
++  private static final Log LOG = LogFactory.getLog(ContainerParticipant.class);
++
++  public static void main(String[] args) throws InterruptedException {
++    LOG.info("Starting participant: "+ Arrays.toString(args));
++    Thread.currentThread().join();
++  }
++}

http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index 1b27378,3b4e937..3adffd6
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@@ -98,17 -93,20 +94,15 @@@ public class GenericApplicationMaster 
    // Tracking url to which app master publishes info for clients to monitor
    private String appMasterTrackingUrl = "";
  
- 
 -  // Counter for completed containers ( complete denotes successful or failed )
 -  AtomicInteger numCompletedContainers = new AtomicInteger();
 -  // Allocated container count so that we know how many containers has the RM
 -  // allocated to us
 -  AtomicInteger numAllocatedContainers = new AtomicInteger();
 -  // Count of failed containers
 -  AtomicInteger numFailedContainers = new AtomicInteger();
 -  // Count of containers already requested from the RM
 -  // Needed as once requested, we should not request for containers again.
 -  // Only request for more if the original requirement changes.
 -  AtomicInteger numRequestedContainers = new AtomicInteger();
    Map<ContainerRequest, SettableFuture<ContainerAskResponse>> containerRequestMap =
        new LinkedHashMap<AMRMClient.ContainerRequest, SettableFuture<ContainerAskResponse>>();
 +  Map<ContainerId, SettableFuture<ContainerReleaseResponse>> containerReleaseMap =
 +      new LinkedHashMap<ContainerId, SettableFuture<ContainerReleaseResponse>>();
 +  Map<ContainerId, SettableFuture<ContainerStopResponse>> containerStopMap =
 +      new LinkedHashMap<ContainerId, SettableFuture<ContainerStopResponse>>();
 +  Map<ContainerId, SettableFuture<ContainerLaunchResponse>> containerLaunchResponseMap =
 +      new LinkedHashMap<ContainerId, SettableFuture<ContainerLaunchResponse>>();
  
-   
    ByteBuffer allTokens;
  
    // Launch threads
@@@ -120,7 -117,7 +113,7 @@@
      // Set up the configuration
      conf = new YarnConfiguration();
    }
--  
++
    /**
     * Dump out contents of $CWD and the environment to stdout for debugging
     */
@@@ -154,15 -151,15 +147,13 @@@
      }
    }
  
--
--
    /**
     * Parse command line options
     * @param args Command line args
     * @return Whether init successful and run should be invoked
     * @throws ParseException
     * @throws IOException
--   * @throws YarnException 
++   * @throws YarnException
     */
    public boolean start() throws ParseException, IOException, YarnException {
  
@@@ -234,33 -231,30 +225,35 @@@
      return true;
    }
  
--
-   public Future<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
+   public ListenableFuture<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
+     amRMClient.addContainerRequest(containerAsk);
 -    numRequestedContainers.incrementAndGet();
      SettableFuture<ContainerAskResponse> future = SettableFuture.create();
 +    containerRequestMap.put(containerAsk, future);
 +    amRMClient.addContainerRequest(containerAsk);
      return future;
    }
  
-   public Future<ContainerStopResponse> stopContainer(Container container) {
+   public ListenableFuture<ContainerStopResponse> stopContainer(Container container) {
+     nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
      SettableFuture<ContainerStopResponse> future = SettableFuture.create();
 +    containerStopMap.put(container.getId(), future);
 +    nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
      return future;
    }
  
-   public Future<ContainerReleaseResponse> releaseContainer(Container container) {
+   public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) {
+     amRMClient.releaseAssignedContainer(container.getId());
      SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
 +    containerReleaseMap.put(container.getId(), future);
 +    amRMClient.releaseAssignedContainer(container.getId());
      return future;
    }
  
-   public Future<ContainerLaunchResponse> launchContainer(Container container,
+   public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container,
        ContainerLaunchContext containerLaunchContext) {
 -    nmClientAsync.startContainerAsync(container, containerLaunchContext);
      SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
 +    containerLaunchResponseMap.put(container.getId(), future);
 +    nmClientAsync.startContainerAsync(container, containerLaunchContext);
      return future;
    }
  

http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index f983255,e921c87..f74e312
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@@ -1,38 -1,14 +1,39 @@@
  package org.apache.helix.provisioning.yarn;
  
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
  import java.util.Collection;
  import java.util.HashMap;
 +import java.util.List;
  import java.util.Map;
 +import java.util.Vector;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
+ import java.util.concurrent.Executors;
  
 +import org.apache.commons.compress.archivers.ArchiveStreamFactory;
 +import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.hadoop.security.Credentials;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.hadoop.security.token.Token;
 +import org.apache.hadoop.yarn.api.ApplicationConstants;
 +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
  import org.apache.hadoop.yarn.api.records.Container;
  import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 +import org.apache.hadoop.yarn.api.records.LocalResource;
 +import org.apache.hadoop.yarn.api.records.LocalResourceType;
 +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
  import org.apache.hadoop.yarn.api.records.Priority;
  import org.apache.hadoop.yarn.api.records.Resource;
  import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@@ -50,178 -23,70 +51,190 @@@ import org.apache.helix.controller.prov
  import org.apache.helix.controller.provisioner.Provisioner;
  import org.apache.helix.controller.provisioner.TargetProviderResponse;
  
 +import com.google.common.collect.Lists;
+ import com.google.common.base.Function;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.ListeningExecutorService;
+ import com.google.common.util.concurrent.MoreExecutors;
  
  public class YarnProvisioner implements Provisioner {
  
    private static final Log LOG = LogFactory.getLog(YarnProvisioner.class);
    static GenericApplicationMaster applicationMaster;
 -  static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
++  static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
++      .newCachedThreadPool());
    Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
 +  int DEFAULT_CONTAINER = 4;
  
    @Override
-   public ContainerId allocateContainer(ContainerSpec spec) {
+   public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) {
      ContainerRequest containerAsk = setupContainerAskForRM(spec);
-     Future<ContainerAskResponse> requestNewContainer =
+     ListenableFuture<ContainerAskResponse> requestNewContainer =
          applicationMaster.acquireContainer(containerAsk);
-     ContainerAskResponse containerAskResponse;
-     try {
-       containerAskResponse = requestNewContainer.get();
-       ContainerId helixContainerId =
-           ContainerId.from(containerAskResponse.getContainer().getId().toString());
-       allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
-       return helixContainerId;
-     } catch (Exception e) {
-       LOG.error("Exception in allocateContainer for spec:" + spec, e);
-     }
-     return null;
 -    return Futures.transform(requestNewContainer, new Function<ContainerAskResponse, ContainerId>() {
 -      @Override
 -      public ContainerId apply(ContainerAskResponse containerAskResponse) {
 -        ContainerId helixContainerId =
 -            ContainerId.from(containerAskResponse.getContainer().getId().toString());
 -        allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
 -        return helixContainerId;
 -      }
 -    });
++    return Futures.transform(requestNewContainer,
++        new Function<ContainerAskResponse, ContainerId>() {
++          @Override
++          public ContainerId apply(ContainerAskResponse containerAskResponse) {
++            ContainerId helixContainerId =
++                ContainerId.from(containerAskResponse.getContainer().getId().toString());
++            allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
++            return helixContainerId;
++          }
++        });
++
    }
  
    @Override
-   public boolean deallocateContainer(ContainerId containerId) {
-     Future<ContainerReleaseResponse> releaseContainer =
+   public ListenableFuture<Boolean> deallocateContainer(ContainerId containerId) {
+     ListenableFuture<ContainerReleaseResponse> releaseContainer =
          applicationMaster.releaseContainer(allocatedContainersMap.get(containerId));
-     try {
-       releaseContainer.get();
-       return true;
-     } catch (Exception e) {
-       LOG.error("Exception in deallocateContainer containerId:" + containerId, e);
-     }
-     return false;
+     return Futures.transform(releaseContainer, new Function<ContainerReleaseResponse, Boolean>() {
+       @Override
+       public Boolean apply(ContainerReleaseResponse response) {
+         return response != null;
+       }
+     }, service);
++
    }
  
    @Override
-   public boolean startContainer(ContainerId containerId) {
+   public ListenableFuture<Boolean> startContainer(final ContainerId containerId) {
      Container container = allocatedContainersMap.get(containerId);
 -    ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
 -    ListenableFuture<ContainerLaunchResponse> future = applicationMaster.launchContainer(container, containerLaunchContext);
++    ContainerLaunchContext launchContext;
 +    try {
-       Future<ContainerLaunchResponse> launchContainer =
-           applicationMaster.launchContainer(container, createLaunchContext(containerId));
-       ContainerLaunchResponse containerLaunchResponse = launchContainer.get();
-       return true;
++      launchContext = createLaunchContext(containerId);
 +    } catch (Exception e) {
-       LOG.error("Exception while starting container containerId:" + containerId, e);
++      LOG.error("Exception while creating context to launch container:" + containerId, e);
++      return null;
 +    }
-     return false;
++    ListenableFuture<ContainerLaunchResponse> future =
++        applicationMaster.launchContainer(container, launchContext);
+     return Futures.transform(future, new Function<ContainerLaunchResponse, Boolean>() {
+       @Override
+       public Boolean apply(ContainerLaunchResponse response) {
+         return response != null;
+       }
+     }, service);
    }
  
 +  private ContainerLaunchContext createLaunchContext(ContainerId containerId) throws Exception {
 +
 +    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
 +
 +    Map<String, String> envs = System.getenv();
 +    String appName = envs.get("appName");
 +    String appId = envs.get("appId");
 +    String appClasspath = envs.get("appClasspath");
 +    String containerParticipantMainClass = envs.get("containerParticipantMainClass");
 +    String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181";
 +
 +    // set the localresources needed to launch container
 +    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
 +
 +    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
 +    YarnConfiguration conf = new YarnConfiguration();
 +    FileSystem fs;
 +    fs = FileSystem.get(conf);
 +    String pathSuffix = appName + "/" + appId + "/app-pkg.tar";
 +    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
 +    FileStatus destStatus = fs.getFileStatus(dst);
 +
 +    // Set the type of resource - file or archive
 +    // archives are untarred at destination
 +    // we don't need the jar file to be untarred for now
 +    amJarRsrc.setType(LocalResourceType.ARCHIVE);
 +    // Set visibility of the resource
 +    // Setting to most private option
 +    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
 +    // Set the resource to be copied over
 +    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
 +    // Set timestamp and length of file so that the framework
 +    // can do basic sanity checks for the local resource
 +    // after it has been copied over to ensure it is the same
 +    // resource the client intended to use with the application
 +    amJarRsrc.setTimestamp(destStatus.getModificationTime());
 +    amJarRsrc.setSize(destStatus.getLen());
 +    localResources.put("app-pkg", amJarRsrc);
 +
 +    // Set local resource info into app master container launch context
 +    amContainer.setLocalResources(localResources);
 +
 +    // Set the necessary security tokens as needed
 +    // amContainer.setContainerTokens(containerToken);
 +
 +    // Set the env variables to be setup in the env where the application master will be run
 +    LOG.info("Set the environment for the application master");
 +    Map<String, String> env = new HashMap<String, String>();
 +    env.put("app-pkg-path", dst.getName());
 +    // Add AppMaster.jar location to classpath
 +    // At some point we should not be required to add
 +    // the hadoop specific classpaths to the env.
 +    // It should be provided out of the box.
 +    // For now setting all required classpaths including
 +    // the classpath to "." for the application jar
 +    StringBuilder classPathEnv =
 +        new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*");
 +    classPathEnv.append(File.pathSeparatorChar);
 +    classPathEnv.append(appClasspath);
 +
 +    for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
 +        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
 +      classPathEnv.append(File.pathSeparatorChar);
 +      classPathEnv.append(c.trim());
 +    }
 +    classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
 +
 +    // add the runtime classpath needed for tests to work
 +    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
 +      classPathEnv.append(':');
 +      classPathEnv.append(System.getProperty("java.class.path"));
 +    }
 +    System.out.println("classoath" + classPathEnv.toString());
 +    env.put("CLASSPATH", classPathEnv.toString());
 +
 +    amContainer.setEnvironment(env);
 +
 +    // Set the necessary command to execute the application master
 +    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
 +
 +    // Set java executable command
 +    LOG.info("Setting up app master command");
 +    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 +    // Set Xmx based on am memory size
 +    vargs.add("-Xmx" + 1024 + "m");
 +    // Set class name
 +    vargs.add(containerParticipantMainClass);
 +    // Set params for container participant
 +    vargs.add("--zk_address " + zkAddress);
 +    vargs.add("--participantId " + containerId.stringify());
 +
 +    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
 +    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
 +
 +    // Get final commmand
 +    StringBuilder command = new StringBuilder();
 +    for (CharSequence str : vargs) {
 +      command.append(str).append(" ");
 +    }
 +
 +    LOG.info("Completed setting up app master command " + command.toString());
 +    List<String> commands = new ArrayList<String>();
 +    commands.add(command.toString());
 +    amContainer.setCommands(commands);
 +    return amContainer;
 +  }
 +
    @Override
-   public boolean stopContainer(ContainerId containerId) {
+   public ListenableFuture<Boolean> stopContainer(final ContainerId containerId) {
      Container container = allocatedContainersMap.get(containerId);
-     Future<ContainerStopResponse> stopContainer = applicationMaster.stopContainer(container);
-     try {
-       ContainerStopResponse containerStopResponse = stopContainer.get();
-       return true;
-     } catch (Exception e) {
-       LOG.error("Exception while stopping container containerId:" + containerId, e);
-     }
-     return false;
+     ListenableFuture<ContainerStopResponse> future = applicationMaster.stopContainer(container);
+     return Futures.transform(future, new Function<ContainerStopResponse, Boolean>() {
+       @Override
+       public Boolean apply(ContainerStopResponse response) {
+         return response != null;
+       }
+     }, service);
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
index 0000000,0000000..8427c14
new file mode 100644
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
@@@ -1,0 -1,0 +1,46 @@@
++package org.apache.helix.provisioning.yarn;
++
++import org.apache.helix.api.id.ResourceId;
++import org.apache.helix.controller.provisioner.ProvisionerConfig;
++import org.apache.helix.controller.provisioner.ProvisionerRef;
++import org.apache.helix.controller.serializer.DefaultStringSerializer;
++import org.apache.helix.controller.serializer.StringSerializer;
++import org.apache.helix.integration.TestLocalContainerProvider.LocalProvisioner;
++import org.codehaus.jackson.annotate.JsonProperty;
++
++public class YarnProvisionerConfig implements ProvisionerConfig {
++
++  private ResourceId _resourceId;
++  private Class<? extends StringSerializer> _serializerClass;
++  private ProvisionerRef _provisionerRef;
++
++  public YarnProvisionerConfig(@JsonProperty("resourceId") ResourceId resourceId) {
++    _resourceId = resourceId;
++    _serializerClass = DefaultStringSerializer.class;
++    _provisionerRef = ProvisionerRef.from(YarnProvisioner.class.getName());
++  }
++
++  @Override
++  public ResourceId getResourceId() {
++    return _resourceId;
++  }
++
++  @Override
++  public ProvisionerRef getProvisionerRef() {
++    return _provisionerRef;
++  }
++
++  public void setProvisionerRef(ProvisionerRef provisionerRef) {
++    _provisionerRef = provisionerRef;
++  }
++
++  @Override
++  public Class<? extends StringSerializer> getSerializerClass() {
++    return _serializerClass;
++  }
++
++  public void setSerializerClass(Class<? extends StringSerializer> serializerClass) {
++    _serializerClass = serializerClass;
++  }
++
++}