You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/10/25 08:27:25 UTC

[1/2] git commit: Added support for YARN - includes regression test + "s4 yarn" command - also added pluggable mechanism for s4r fetchers - added hdfs s4r fetcher

Updated Branches:
  refs/heads/S4-25 [created] 4e78c3e46


Added support for YARN
- includes regression test + "s4 yarn" command
- also added pluggable mechanism for s4r fetchers
- added hdfs s4r fetcher


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/4e78c3e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/4e78c3e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/4e78c3e4

Branch: refs/heads/S4-25
Commit: 4e78c3e46ce7114f65d493297ce28314b46f4fb9
Parents: a7f86ac
Author: Matthieu Morel <mm...@apache.org>
Authored: Wed Oct 24 17:38:53 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Oct 25 10:09:55 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |   10 +-
 lib/hadoop-common-tests-2.0.2-alpha.jar            |  Bin 0 -> 1175614 bytes
 lib/hadoop-hdfs-tests-2.0.2-alpha.jar              |  Bin 0 -> 1520018 bytes
 lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar |  Bin 0 -> 39572 bytes
 s4                                                 |   24 +-
 settings.gradle                                    |    3 +-
 subprojects/s4-core/s4-core.gradle                 |   13 +
 .../java/org/apache/s4/core/DefaultCoreModule.java |    9 +
 .../s4/deploy/DistributedDeploymentManager.java    |   18 +-
 .../org/apache/s4/deploy/FileSystemS4RFetcher.java |   12 +
 .../java/org/apache/s4/deploy/HttpS4RFetcher.java  |    6 +
 .../main/java/org/apache/s4/deploy/S4RFetcher.java |    7 +
 .../apache/s4/deploy/TestAutomaticDeployment.java  |    8 +-
 .../src/main/java/org/apache/s4/tools/Deploy.java  |   40 +-
 subprojects/s4-yarn/s4-yarn.gradle                 |  102 +++
 .../org/apache/s4/deploy/HdfsFetcherModule.java    |   18 +
 .../java/org/apache/s4/deploy/HdfsS4RFetcher.java  |   64 ++
 .../apache/s4/tools/yarn/S4ApplicationMaster.java  |  706 +++++++++++++++
 .../org/apache/s4/tools/yarn/S4YarnClient.java     |  480 ++++++++++
 .../java/org/apache/s4/tools/yarn/YarnArgs.java    |   78 ++
 .../org/apache/s4/tools/yarn/package-info.java     |   11 +
 .../apache/s4/tools/yarn/TestYarnDeployment.java   |  197 ++++
 22 files changed, 1772 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 60577de..27c38ed 100644
--- a/build.gradle
+++ b/build.gradle
@@ -40,6 +40,8 @@ allprojects {
         maven { url 'http://google-gson.googlecode.com/svn/mavenrepo' }
         maven { url 'https://repo.springsource.org/libs-release' }
         maven { url 'http://repo.gradle.org/gradle/libs-releases-local' }
+        
+        
 
         /* Add lib dir as a repo. Some jar files that are not available
          in a public repo are distributed in the lib dir. */
@@ -58,6 +60,7 @@ project.ext["libraries"] = [
     guice:              'com.google.inject:guice:3.0',
     aop_alliance:       'aopalliance:aopalliance:1.0',
     guice_assist:       'com.google.inject.extensions:guice-assistedinject:3.0',
+    guice_multibindings:'com.google.inject.extensions:guice-multibindings:3.0',
     kryo:               'com.googlecode:kryo:1.04',
     minlog:             'com.googlecode:minlog:1.2',
     reflectasm:         'com.googlecode:reflectasm:1.01',
@@ -86,7 +89,8 @@ project.ext["libraries"] = [
     gradle_base_services: 'org.gradle:gradle-base-services:1.0',
     gradle_core: 'org.gradle:gradle-core:1.0',
     gradle_tooling_api: 'org.gradle:gradle-tooling-api:1.0',
-    gradle_wrapper: 'gradle-wrapper:gradle-wrapper:1.0'
+    gradle_wrapper: 'gradle-wrapper:gradle-wrapper:1.0',
+
 ]
 
 subprojects {
@@ -114,12 +118,13 @@ subprojects {
         compile ( libraries.guava )
         compile ( libraries.guice )
         compile ( libraries.guice_assist)
+        compile ( libraries.guice_multibindings)
 
         /* Logging. */
         compile( libraries.slf4j )
         compile( libraries.logback_core )
         compile( libraries.logback_classic )
-        runtime( libraries.commons_logging)
+        compile( libraries.commons_logging)
 
         /* Commons. */
         compile( libraries.commons_config )
@@ -146,6 +151,7 @@ subprojects {
         runtime(libraries.asm)
         runtime(libraries.javax_inject)
         runtime(libraries.commons_codec)
+
     }
     
     jar {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/lib/hadoop-common-tests-2.0.2-alpha.jar
----------------------------------------------------------------------
diff --git a/lib/hadoop-common-tests-2.0.2-alpha.jar b/lib/hadoop-common-tests-2.0.2-alpha.jar
new file mode 100644
index 0000000..224b855
Binary files /dev/null and b/lib/hadoop-common-tests-2.0.2-alpha.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/lib/hadoop-hdfs-tests-2.0.2-alpha.jar
----------------------------------------------------------------------
diff --git a/lib/hadoop-hdfs-tests-2.0.2-alpha.jar b/lib/hadoop-hdfs-tests-2.0.2-alpha.jar
new file mode 100644
index 0000000..ff760bb
Binary files /dev/null and b/lib/hadoop-hdfs-tests-2.0.2-alpha.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar
----------------------------------------------------------------------
diff --git a/lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar b/lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar
new file mode 100644
index 0000000..cdd1dc5
Binary files /dev/null and b/lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/s4
----------------------------------------------------------------------
diff --git a/s4 b/s4
index 6471486..6dd51af 100755
--- a/s4
+++ b/s4
@@ -1,14 +1,30 @@
 #!/bin/bash
 
-# NOTE: "./gradlew s4-tools:installApp" will prepare/update the tools subproject and related startup scripts
+# NOTE1: "./gradlew s4-tools:installApp" will prepare/update the tools subproject and related startup scripts
+# NOTE2: "./gradlew s4-yarn:installApp" will prepare/update the yarn deployment tools if needed
 
 S4_DIR="$( cd -P "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 S4_SCRIPT_PATH="$S4_DIR/s4"
 
-# JVM options for starting nodes and for other s4 tools can be configured here
-# export JAVA_OPTS=-Xmx1G
+case "$1" in
+"yarn")
+    if [ ! -f subprojects/s4-yarn/build/install/s4-yarn/bin/s4-yarn ]
+    then
+        echo "Cannot find s4 yarn script (subprojects/s4-yarn/build/install/s4-yarn/bin/s4-yarn). Please install the script with './gradlew s4-yarn:installApp' "
+        exit 1
+    fi
+    shift 1
+    subprojects/s4-yarn/build/install/s4-yarn/bin/s4-yarn -s4Dir=$S4_DIR $@
+;;
 
-subprojects/s4-tools/build/install/s4-tools/bin/s4-tools -s4ScriptPath=$S4_SCRIPT_PATH $@
+*)
 
+    # JVM options for starting nodes and for other s4 tools can be configured here
+    # export JAVA_OPTS=-Xmx1G
+
+    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools -s4ScriptPath=$S4_SCRIPT_PATH $@
+;;
+
+esac
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index c2b9b67..9e652bb 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -21,7 +21,8 @@ include 's4-core'
 include 's4-comm'
 include 's4-edsl'
 include 's4-example'
-include 's4-tools'
+include 's4-tools'
+include 's4-yarn'
 //include 's4-example'
 //include ':test-apps:simple-adapter-1'
 include ':test-apps:simple-deployable-app-1'

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index 0905ce7..f66ac21 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -34,6 +34,19 @@ dependencies {
     testCompile libraries.gradle_wrapper
     testCompile libraries.mockito_core
 }
+
+configurations {
+    tests
+}
+
+task testJar(type: Jar) {
+    baseName = "test-${project.archivesBaseName}"
+    from sourceSets.test.output
+}
+
+artifacts {
+    tests testJar
+}
 
 test {
     forkEvery=1;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..4c5bb86 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -33,11 +33,15 @@ import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.ft.NoOpCheckpointingFramework;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.deploy.FileSystemS4RFetcher;
+import org.apache.s4.deploy.HttpS4RFetcher;
+import org.apache.s4.deploy.S4RFetcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.multibindings.Multibinder;
 import com.google.inject.name.Names;
 
 /**
@@ -78,6 +82,11 @@ public class DefaultCoreModule extends AbstractModule {
 
         bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
 
+        // allow for pluggable s4r fetching strategies
+        Multibinder<S4RFetcher> s4rFetcherMultibinder = Multibinder.newSetBinder(binder(), S4RFetcher.class);
+        s4rFetcherMultibinder.addBinding().to(FileSystemS4RFetcher.class);
+        s4rFetcherMultibinder.addBinding().to(HttpS4RFetcher.class);
+
         bind(S4RLoaderFactory.class);
 
         // For enabling checkpointing, one needs to use a custom module, such as

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index 9fc5c53..2951c76 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Set;
 
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
@@ -78,14 +79,17 @@ public class DistributedDeploymentManager implements DeploymentManager {
     private final Server server;
     boolean deployed = false;
 
+    private final Set<S4RFetcher> s4rFetchers;
+
     @Inject
     public DistributedDeploymentManager(@Named("s4.cluster.name") String clusterName,
             @Named("s4.cluster.zk_address") String zookeeperAddress,
             @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server) {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server, Set<S4RFetcher> s4rFetchers) {
 
         this.clusterName = clusterName;
         this.server = server;
+        this.s4rFetchers = s4rFetchers;
 
         zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         zkClient.setZkSerializer(new ZNRecordSerializer());
@@ -146,13 +150,13 @@ public class DistributedDeploymentManager implements DeploymentManager {
     // NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
     // but that's probably not that useful, and we can simply provide whichever protocol is needed
     public InputStream fetchS4App(URI uri) throws DeploymentFailedException {
-        String scheme = uri.getScheme();
-        if ("file".equalsIgnoreCase(scheme)) {
-            return new FileSystemS4RFetcher().fetch(uri);
-        }
-        if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
-            return new HttpS4RFetcher().fetch(uri);
+
+        for (S4RFetcher s4rFetcher : s4rFetchers) {
+            if (s4rFetcher.handlesProtocol(uri)) {
+                return s4rFetcher.fetch(uri);
+            }
         }
+        String scheme = uri.getScheme();
         throw new DeploymentFailedException("Unsupported protocol " + scheme);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
index 8947998..bd0c55a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
@@ -24,18 +24,30 @@ import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.net.URI;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Fetches S4R files from a file system, possibly distributed.
  * 
  */
 public class FileSystemS4RFetcher implements S4RFetcher {
 
+    private static Logger logger = LoggerFactory.getLogger(FileSystemS4RFetcher.class);
+
+    @Override
+    public boolean handlesProtocol(URI uri) {
+        return "file".equalsIgnoreCase(uri.getScheme());
+    }
+
     @Override
     public InputStream fetch(URI uri) throws DeploymentFailedException {
+        logger.debug("Fetching uri through the file system : {}", uri.toString());
         try {
             return new FileInputStream(new File(uri));
         } catch (FileNotFoundException e) {
             throw new DeploymentFailedException("Cannot retrieve file from uri [" + uri.toString() + "]");
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
index 1bb4c01..a8873b6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
@@ -71,6 +71,11 @@ public class HttpS4RFetcher implements S4RFetcher {
     private static Logger logger = LoggerFactory.getLogger(HttpS4RFetcher.class);
 
     @Override
+    public boolean handlesProtocol(URI uri) {
+        return ("http".equalsIgnoreCase(uri.getScheme()) || "https".equalsIgnoreCase(uri.getScheme()));
+    }
+
+    @Override
     public InputStream fetch(URI uri) throws DeploymentFailedException {
         logger.debug("Fetching file through http: {}", uri.toString());
 
@@ -183,4 +188,5 @@ public class HttpS4RFetcher implements S4RFetcher {
             ByteStreams.copy(cbis, fos);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
index 9c6590f..69765fc 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
@@ -29,6 +29,13 @@ import java.net.URI;
 public interface S4RFetcher {
 
     /**
+     * 
+     * @param uri
+     * @return true if the uri's protocol is handled by this fetcher
+     */
+    boolean handlesProtocol(URI uri);
+
+    /**
      * Returns a stream to an S4R archive file
      * 
      * @param uri

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 1fbd37a..9f5bc50 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -184,8 +184,8 @@ public class TestAutomaticDeployment extends ZkBasedTest {
         zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
 
             @Override
-            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
-                if (currentChilds.size() == 2) {
+            public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
+                if (currentChildren.size() == 1) {
                     signalProcessesReady.countDown();
                 }
 
@@ -195,8 +195,8 @@ public class TestAutomaticDeployment extends ZkBasedTest {
         forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
 
         // TODO synchro with ready state from zk
-        Thread.sleep(10000);
-        // Assert.assertTrue(signalProcessesReady.await(10, TimeUnit.SECONDS));
+        // Thread.sleep(10000);
+        Assert.assertTrue(signalProcessesReady.await(30, TimeUnit.SECONDS));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 94e359c..30dfb0f 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -66,18 +66,21 @@ public class Deploy extends S4ArgsBase {
                 System.exit(1);
             }
 
-            File s4rToDeploy;
+            java.net.URI s4rURI = null;
 
             if (deployArgs.s4rPath != null) {
-                s4rToDeploy = new File(deployArgs.s4rPath);
-                if (!s4rToDeploy.exists()) {
-                    logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
-                    System.exit(1);
-                } else {
-                    logger.info(
-                            "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
-                            s4rToDeploy.getAbsolutePath());
+                s4rURI = new java.net.URI(deployArgs.s4rPath);
+                if (s4rURI.getScheme().equalsIgnoreCase("file")) {
+                    File s4rToDeploy = new File(deployArgs.s4rPath);
+                    if (!s4rToDeploy.exists()) {
+                        logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
+                        System.exit(1);
+                    }
                 }
+                logger.info(
+                        "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
+                        s4rURI.toString());
+
             } else {
                 List<String> params = new ArrayList<String>();
                 // prepare gradle -P parameters, including passed gradle opts
@@ -89,21 +92,22 @@ public class Deploy extends S4ArgsBase {
                 File tmpS4R = new File(tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r");
                 if (!Strings.isNullOrEmpty(deployArgs.generatedS4R)) {
                     logger.info("Copying generated S4R to [{}]", deployArgs.generatedS4R);
-                    s4rToDeploy = new File(deployArgs.generatedS4R);
+                    File s4rToDeploy = new File(deployArgs.generatedS4R);
                     if (!(ByteStreams.copy(Files.newInputStreamSupplier(tmpS4R),
                             Files.newOutputStreamSupplier(s4rToDeploy)) > 0)) {
                         logger.error("Cannot copy generated s4r from {} to {}", tmpS4R.getAbsolutePath(),
                                 s4rToDeploy.getAbsolutePath());
                         System.exit(1);
+                    } else {
+                        s4rURI = s4rToDeploy.toURI();
                     }
                 } else {
-                    s4rToDeploy = tmpS4R;
+                    s4rURI = tmpS4R.toURI();
                 }
             }
 
-            final String uri = s4rToDeploy.toURI().toString();
             ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+            record.putSimpleField(DistributedDeploymentManager.S4R_URI, s4rURI.toString());
             record.putSimpleField("name", deployArgs.appName);
             String deployedAppPath = "/s4/clusters/" + deployArgs.clusterName + "/app/s4App";
             if (zkClient.exists(deployedAppPath)) {
@@ -117,11 +121,12 @@ public class Deploy extends S4ArgsBase {
             logger.info(
                     "uploaded application [{}] to cluster [{}], using zookeeper znode [{}], and s4r file [{}]",
                     new String[] { deployArgs.appName, deployArgs.clusterName,
-                            "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName,
-                            s4rToDeploy.getAbsolutePath() });
+                            "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName, s4rURI.toString() });
 
             // Explicitly shutdown the JVM since Gradle leaves non-daemon threads running that delay the termination
-            System.exit(0);
+            if (deployArgs.shutdown) {
+                System.exit(0);
+            }
         } catch (Exception e) {
             LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
         }
@@ -155,6 +160,9 @@ public class Deploy extends S4ArgsBase {
         @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
         int timeout = 10000;
 
+        @Parameter(names = "-shutdown", description = "Shutdown JVM after deployment. Useful to avoid waiting for remaining long running threads from Gradle", arity = 1)
+        boolean shutdown = true;
+
     }
 
     static class ExecGradle {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/s4-yarn.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/s4-yarn.gradle b/subprojects/s4-yarn/s4-yarn.gradle
new file mode 100644
index 0000000..215d93a
--- /dev/null
+++ b/subprojects/s4-yarn/s4-yarn.gradle
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+description = 'The S4 yarn tool'
+
+apply plugin: 'java'
+
+
+task "create-dirs" << {
+   sourceSets.all*.java.srcDirs*.each { it.mkdirs() }
+   sourceSets.all*.resources.srcDirs*.each { it.mkdirs() }
+}
+
+configurations.all {
+        transitive = true
+    }
+
+
+dependencies {
+    compile project(":s4-base")
+    compile project(":s4-comm")
+    compile project(":s4-core")
+    compile project(":s4-tools")
+    compile libraries.jcommander
+    compile libraries.gradle_base_services
+    compile libraries.gradle_core
+    compile libraries.gradle_tooling_api
+    compile libraries.gradle_wrapper
+    compile libraries.log4j
+    compile libraries.zkclient
+    
+    testCompile project(path: ':s4-comm', configuration: 'tests')
+    testCompile project(path: ':s4-core', configuration: 'tests')
+    
+    compile('commons-cli:commons-cli:1.2')
+    runtime('org.apache.hadoop:hadoop-auth:2.0.2-alpha') {transitive = true}
+    compile('org.apache.hadoop:hadoop-common:2.0.2-alpha') {transitive = true} 
+    runtime('org.apache.hadoop:hadoop-hdfs:2.0.2-alpha') {transitive = true}
+    testRuntime('org.apache.hadoop:hadoop-yarn-server:2.0.2-alpha') {transitive = true}
+    compile('org.apache.hadoop:hadoop-yarn-client:2.0.2-alpha') {transitive = true}
+    testRuntime('org.apache.hadoop:hadoop-yarn-server-common:2.0.2-alpha') {transitive = true}
+    testRuntime('org.apache.hadoop:hadoop-yarn-server-nodemanager:2.0.2-alpha') {transitive = true}
+    testRuntime('org.apache.hadoop:hadoop-yarn-server-resourcemanager:2.0.2-alpha') {transitive = true}
+    testRuntime('org.apache.hadoop:hadoop-yarn-server-web-proxy:2.0.2-alpha') {transitive = true}
+    runtime('org.apache.hadoop:hadoop-mapreduce-client-shuffle:2.0.2-alpha') {transitive = true}
+    // NOTE: the tests jars are available on maven but can't find a way to reference them) so we copied them locally with modified names
+    testCompile('org.apache.hadoop:hadoop-common-tests:2.0.2-alpha') {transitive = true}
+    testCompile('org.apache.hadoop:hadoop-hdfs-tests:2.0.2-alpha@jar') {transitive = true}
+    testCompile('org.apache.hadoop:hadoop-yarn-server-tests:2.0.2-alpha-tests') {transitive = true}
+    testRuntime('javax.servlet:servlet-api:2.5')
+    testRuntime('com.google.inject.extensions:guice-servlet:3.0') {transitive=true}
+    compile('org.apache.hadoop:hadoop-yarn-api:2.0.2-alpha') {transitive = true}
+    compile('org.apache.hadoop:hadoop-yarn-common:2.0.2-alpha') {transitive = true}
+    runtime('com.google.protobuf:protobuf-java:2.4.1')
+    // Problem of commons-daemon is that there is a dependency on 1.0.3 but that version is not available in maven repos. 
+    // For that reason we explicitly specify version 1.0.10 
+    compile('commons-daemon:commons-daemon:1.0.10') {transitive = true}
+    runtime('org.apache.zookeeper:zookeeper:3.3.1'){force=true}
+
+}
+
+apply plugin:'application'
+mainClassName = "org.apache.s4.tools.yarn.S4YarnClient"
+
+task copyTemplates << {
+    copy {
+        from 'src/main/java/hello'
+        into 'src/main/resources'
+    }
+}
+
+run {
+    // run doesn't yet directly accept command line parameters...
+    if ( project.hasProperty('args') ) {
+        args project.args.split('\\s+')
+        print args
+    }
+ }
+
+
+test {
+    forkEvery=1
+    // testLogging.showStandardStreams = true
+}
+
+test.dependsOn installApp

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsFetcherModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsFetcherModule.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsFetcherModule.java
new file mode 100644
index 0000000..97e5758
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsFetcherModule.java
@@ -0,0 +1,18 @@
+package org.apache.s4.deploy;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
+
+/**
+ * This module plugs an HDFS S4R fetcher into the node configuration.
+ * 
+ */
+public class HdfsFetcherModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        Multibinder<S4RFetcher> s4rFetcherMultibinder = Multibinder.newSetBinder(binder(), S4RFetcher.class);
+        s4rFetcherMultibinder.addBinding().to(HdfsS4RFetcher.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsS4RFetcher.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsS4RFetcher.java
new file mode 100644
index 0000000..1f5e5d2
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsS4RFetcher.java
@@ -0,0 +1,64 @@
+package org.apache.s4.deploy;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Fetches S4R archives from HDFS, i.e. S4R published with a "hdfs://path/to/the/file" - like URI.
+ * 
+ * The HDFS configuration is fetched from the classpath (typically, from core-site.xml).
+ * 
+ */
+public class HdfsS4RFetcher implements S4RFetcher {
+
+    private static Logger logger = LoggerFactory.getLogger(HdfsS4RFetcher.class);
+
+    final Configuration conf;
+
+    // the fs name holder is optional and only used by tests. Default value is null
+    static class FsNameHolder {
+        @Inject(optional = true)
+        @Named(FileSystem.FS_DEFAULT_NAME_KEY)
+        String value = null;
+    }
+
+    @Inject
+    public HdfsS4RFetcher(FsNameHolder fsNameHolder) {
+        this.conf = new YarnConfiguration();
+        if (fsNameHolder.value != null) {
+            conf.set(FileSystem.FS_DEFAULT_NAME_KEY, fsNameHolder.value);
+        }
+    }
+
+    @Override
+    public boolean handlesProtocol(URI uri) {
+        return "hdfs".equalsIgnoreCase(uri.getScheme());
+    }
+
+    @Override
+    public InputStream fetch(URI uri) throws DeploymentFailedException {
+        try {
+            logger.info("Fetching S4R through hdfs from uri {}", uri.toString());
+            FileSystem fs = FileSystem.get(conf);
+            Path s4rPath = new Path(uri);
+            if (!fs.exists(s4rPath)) {
+                fs.close();
+                throw new DeploymentFailedException("Cannot find S4R file at URI : " + uri.toString());
+            }
+            return fs.open(s4rPath);
+        } catch (IOException e) {
+            throw new DeploymentFailedException("Cannot fetch S4R file at URI: " + uri.toString(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
new file mode 100644
index 0000000..a3e8702
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
@@ -0,0 +1,706 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools.yarn;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.s4.deploy.HdfsFetcherModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+/**
+ * An ApplicationMaster for launching S4 nodes on a set of launched containers using the YARN framework.
+ * 
+ * The code is inspired by the shell example from {@link http
+ * ://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html}
+ * 
+ * 
+ */
+public class S4ApplicationMaster {
+
+    private static Logger logger = LoggerFactory.getLogger(S4ApplicationMaster.class);
+
+    // Configuration
+    private Configuration conf;
+
+    // YARN RPC to communicate with the Resource Manager or Node Manager
+    private YarnRPC rpc;
+
+    // Handle to communicate with the Resource Manager
+    private AMRMProtocol resourceManager;
+
+    // Application Attempt Id ( combination of attemptId and fail count )
+    private ApplicationAttemptId appAttemptID;
+
+    // For status update for clients - yet to be implemented
+    // Hostname of the container
+    private String appMasterHostname = "";
+    // Port on which the app master listens for status update requests from clients
+    private int appMasterRpcPort = 0;
+    // Tracking url to which app master publishes info for clients to monitor
+    private String appMasterTrackingUrl = "";
+
+    // App Master configuration
+    // No. of containers to host S4 nodes on
+    private int numTotalContainers = 1;
+    // Memory to request for the container on which the S4 nodes will be hosted
+    private int containerMemory = 10;
+    // Priority of the request
+    private int requestPriority;
+
+    // Incremental counter for rpc calls to the RM
+    private AtomicInteger rmRequestID = new AtomicInteger();
+
+    // Simple flag to denote whether all works is done
+    private boolean appDone = false;
+    // Counter for completed containers ( complete denotes successful or failed )
+    private AtomicInteger numCompletedContainers = new AtomicInteger();
+    // Allocated container count so that we know how many containers has the RM
+    // allocated to us
+    private AtomicInteger numAllocatedContainers = new AtomicInteger();
+    // Count of failed containers
+    private AtomicInteger numFailedContainers = new AtomicInteger();
+    // Count of containers already requested from the RM
+    // Needed as once requested, we should not request for containers again and again.
+    // Only request for more if the original requirement changes.
+    private AtomicInteger numRequestedContainers = new AtomicInteger();
+
+    // The cluster (or application) name
+    private String cluster = "";
+
+    private String zkString = "";
+
+    // Containers to be released
+    private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
+
+    // Launch threads
+    private List<Thread> launchThreads = new ArrayList<Thread>();
+
+    private boolean testMode;
+
+    private String user;
+
+    /**
+     * @param args
+     *            Command line args
+     */
+    public static void main(String[] args) {
+        boolean result = false;
+        try {
+
+            S4ApplicationMaster appMaster = new S4ApplicationMaster();
+            logger.info("Initializing ApplicationMaster with args " + Arrays.toString(args));
+            boolean doRun = appMaster.init(args);
+            if (!doRun) {
+                System.exit(0);
+            }
+            result = appMaster.run();
+        } catch (Throwable t) {
+            t.printStackTrace();
+            logger.error("Error running ApplicationMaster", t);
+            System.exit(1);
+        }
+        if (result) {
+            logger.info("Application Master completed successfully. exiting");
+            System.exit(0);
+        } else {
+            logger.info("Application Master failed. exiting");
+            System.exit(2);
+        }
+    }
+
+    public S4ApplicationMaster() throws Exception {
+
+        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("Uncaught exception in thread {}", t.getName(), e);
+
+            }
+        });
+    }
+
+    /**
+     * Parse command line options
+     * 
+     * @param args
+     *            Command line args
+     * @return Whether init successful and run should be invoked
+     * @throws ParseException
+     * @throws IOException
+     */
+    public boolean init(String[] args) throws ParseException, IOException {
+
+        Options opts = new Options();
+        opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
+        opts.addOption("c", "cluster", true, "The cluster (or application) name");
+        opts.addOption("zk", true, "Zookeeper connection string");
+        opts.addOption("container_memory", true, "Amount of memory in MB to be requested to host the S4 node");
+        opts.addOption("num_containers", true, "No. of containers on which the S4 node needs to be hosted");
+        opts.addOption("priority", true, "Application Priority. Default 0");
+        opts.addOption("debug", false, "Dump out debug information");
+        opts.addOption("test", false, "Test mode");
+
+        opts.addOption("help", false, "Print usage");
+        CommandLine cliParser = new GnuParser().parse(opts, args);
+
+        if (args.length == 0) {
+            printUsage(opts);
+            throw new IllegalArgumentException("No args specified for application master to initialize");
+        }
+
+        if (cliParser.hasOption("help")) {
+            printUsage(opts);
+            return false;
+        }
+
+        Map<String, String> envs = System.getenv();
+
+        appAttemptID = Records.newRecord(ApplicationAttemptId.class);
+        if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+            if (cliParser.hasOption("app_attempt_id")) {
+                String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
+                appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+            } else {
+                throw new IllegalArgumentException("Application Attempt Id not set in the environment");
+            }
+        } else {
+            ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
+            appAttemptID = containerId.getApplicationAttemptId();
+        }
+
+        logger.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
+                + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
+                + appAttemptID.getAttemptId());
+        logger.info("Application master args = " + Arrays.toString(cliParser.getArgs()));
+        for (Option option : cliParser.getOptions()) {
+            logger.info(option.getArgName() + " / " + option.getDescription() + " / " + option.getOpt() + " / "
+                    + option.getValue());
+        }
+        logger.info("Application master args = " + Arrays.toString(cliParser.getOptions()));
+
+        if (!cliParser.hasOption("cluster")) {
+            throw new IllegalArgumentException("No cluster ID specified to be provisioned by application master");
+        }
+        cluster = cliParser.getOptionValue("cluster");
+        zkString = cliParser.getOptionValue("zk");
+
+        containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "128"));
+        numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+        requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+        user = cliParser.getOptionValue("user");
+
+        conf = new YarnConfiguration();
+        if (cliParser.hasOption("test")) {
+            testMode = true;
+            conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
+        }
+        rpc = YarnRPC.create(conf);
+
+        return true;
+    }
+
+    /**
+     * Helper function to print usage
+     * 
+     * @param opts
+     *            Parsed command line options
+     */
+    private void printUsage(Options opts) {
+        new HelpFormatter().printHelp("ApplicationMaster", opts);
+    }
+
+    /**
+     * Main run function for the application master
+     * 
+     * @throws YarnRemoteException
+     */
+    public boolean run() throws YarnRemoteException {
+        logger.info("Starting ApplicationMaster");
+
+        // Connect to ResourceManager
+        resourceManager = connectToRM();
+
+        // Setup local RPC Server to accept status requests directly from clients
+
+        // Register self with ResourceManager
+        RegisterApplicationMasterResponse response = registerToRM();
+        // Dump out information about cluster capability as seen by the resource manager
+        int minMem = response.getMinimumResourceCapability().getMemory();
+        int maxMem = response.getMaximumResourceCapability().getMemory();
+        logger.info("Min mem capability of resources in this cluster " + minMem);
+        logger.info("Max mem capability of resources in this cluster " + maxMem);
+
+        // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
+        // a multiple of the min value and cannot exceed the max.
+        // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
+        if (containerMemory < minMem) {
+            logger.info("Container memory specified below min threshold of cluster. Using min value." + ", specified="
+                    + containerMemory + ", min=" + minMem);
+            containerMemory = minMem;
+        } else if (containerMemory > maxMem) {
+            logger.info("Container memory specified above max threshold of cluster. Using max value." + ", specified="
+                    + containerMemory + ", max=" + maxMem);
+            containerMemory = maxMem;
+        }
+
+        int loopCounter = -1;
+
+        while (numCompletedContainers.get() < numTotalContainers && !appDone) {
+            loopCounter++;
+
+            // log current state
+            logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total="
+                    + numTotalContainers + ", requested=" + numRequestedContainers + ", completed="
+                    + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated="
+                    + numAllocatedContainers);
+
+            // Sleep before each loop when asking RM for containers
+            // to avoid flooding RM with spurious requests when it
+            // need not have any available containers
+            // Sleeping for 1000 ms.
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                logger.info("Sleep interrupted " + e.getMessage());
+            }
+
+            // No. of containers to request
+            // For the first loop, askCount will be equal to total containers needed
+            // From that point on, askCount will always be 0 as current implementation
+            // does not change its ask on container failures.
+            int askCount = numTotalContainers - numRequestedContainers.get();
+            numRequestedContainers.addAndGet(askCount);
+
+            // Setup request to be sent to RM to allocate containers
+            List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
+            if (askCount > 0) {
+                ResourceRequest containerAsk = setupContainerAskForRM(askCount);
+                resourceReq.add(containerAsk);
+            }
+
+            // Send the request to RM
+            logger.info("Asking RM for containers" + ", askCount=" + askCount);
+            AMResponse amResp = sendContainerAskToRM(resourceReq);
+
+            // Retrieve list of allocated containers from the response
+            List<Container> allocatedContainers = amResp.getAllocatedContainers();
+            logger.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+            numAllocatedContainers.addAndGet(allocatedContainers.size());
+            for (Container allocatedContainer : allocatedContainers) {
+                logger.info("Launching S4 node on a new container." + ", containerId=" + allocatedContainer.getId()
+                        + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":"
+                        + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+                        + allocatedContainer.getNodeHttpAddress() + ", containerState" + allocatedContainer.getState()
+                        + ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
+                // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
+
+                LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
+                Thread launchThread = new Thread(runnableLaunchContainer);
+
+                // launch and start the container on a separate thread to keep the main thread unblocked
+                // as all containers may not be allocated at one go.
+                launchThreads.add(launchThread);
+                launchThread.start();
+            }
+
+            // Check what the current available resources in the cluster are
+            // TODO should we do anything if the available resources are not enough?
+            Resource availableResources = amResp.getAvailableResources();
+            logger.info("Current available resources in the cluster " + availableResources);
+
+            // Check the completed containers
+            List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
+            logger.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+            for (ContainerStatus containerStatus : completedContainers) {
+                logger.info("Got container status for containerID= " + containerStatus.getContainerId() + ", state="
+                        + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus()
+                        + ", diagnostics=" + containerStatus.getDiagnostics());
+
+                // non complete containers should not be here
+                assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+                // increment counters for completed/failed containers
+                int exitStatus = containerStatus.getExitStatus();
+                if (0 != exitStatus) {
+                    // container failed
+                    if (-100 != exitStatus) {
+                        // shell script failed
+                        // counts as completed
+                        numCompletedContainers.incrementAndGet();
+                        numFailedContainers.incrementAndGet();
+                    } else {
+                        // something else bad happened
+                        // app job did not complete for some reason
+                        // we should re-try as the container was lost for some reason
+                        numAllocatedContainers.decrementAndGet();
+                        numRequestedContainers.decrementAndGet();
+                        // we do not need to release the container as it would be done
+                        // by the RM/CM.
+                    }
+                } else {
+                    // nothing to do
+                    // container completed successfully
+                    numCompletedContainers.incrementAndGet();
+                    logger.info("Container completed successfully." + ", containerId="
+                            + containerStatus.getContainerId());
+                }
+
+            }
+            if (numCompletedContainers.get() == numTotalContainers) {
+                appDone = true;
+            }
+
+            logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total="
+                    + numTotalContainers + ", requested=" + numRequestedContainers + ", completed="
+                    + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated="
+                    + numAllocatedContainers);
+
+        }
+
+        // Join all launched threads
+        // needed for when we time out
+        // and we need to release containers
+        for (Thread launchThread : launchThreads) {
+            try {
+                launchThread.join(0);
+            } catch (InterruptedException e) {
+                logger.info("Exception thrown in thread join: " + e.getMessage());
+                e.printStackTrace();
+            }
+        }
+
+        // When the application completes, it should send a finish application signal
+        // to the RM
+        logger.info("Application completed. Signalling finish to RM");
+
+        FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
+        finishReq.setAppAttemptId(appAttemptID);
+        boolean isSuccess = true;
+        if (numFailedContainers.get() == 0) {
+            finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+        } else {
+            finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+            String diagnostics = "Diagnostics." + ", total=" + numTotalContainers + ", completed="
+                    + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed="
+                    + numFailedContainers.get();
+            finishReq.setDiagnostics(diagnostics);
+            isSuccess = false;
+        }
+        resourceManager.finishApplicationMaster(finishReq);
+        return isSuccess;
+    }
+
+    /**
+     * Thread to connect to the {@link ContainerManager} and launch the container that will execute the shell command.
+     */
+    private class LaunchContainerRunnable implements Runnable {
+
+        // Allocated container
+        Container container;
+        // Handle to communicate with ContainerManager
+        ContainerManager cm;
+
+        /**
+         * @param lcontainer
+         *            Allocated container
+         */
+        public LaunchContainerRunnable(Container lcontainer) {
+            this.container = lcontainer;
+        }
+
+        /**
+         * Helper function to connect to CM
+         */
+        private void connectToCM() {
+            logger.debug("Connecting to ContainerManager for containerid=" + container.getId());
+            String cmIpPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort();
+            InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+            logger.info("Connecting to ContainerManager at " + cmIpPortStr);
+            this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
+        }
+
+        @Override
+        /**
+         * Connects to CM, sets up container launch context
+         * for shell command and eventually dispatches the container
+         * start request to the CM.
+         */
+        public void run() {
+            // Connect to ContainerManager
+            connectToCM();
+
+            logger.info("Setting up container launch container for containerid=" + container.getId());
+            ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+            ctx.setContainerId(container.getId());
+            ctx.setResource(container.getResource());
+
+            try {
+                if (!Strings.isNullOrEmpty(user)) {
+                    ctx.setUser(user);
+                } else {
+                    logger.info("Using default user name {}", UserGroupInformation.getCurrentUser().getShortUserName());
+                    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+                }
+            } catch (IOException e) {
+                logger.info("Getting current user info failed when trying to launch the container" + e.getMessage());
+            }
+
+            // Set the local resources
+            Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+            try {
+                FileSystem fs = FileSystem.get(conf);
+
+                RemoteIterator<LocatedFileStatus> files = fs.listFiles(fs.getHomeDirectory(), false);
+                while (files.hasNext()) {
+                    LocatedFileStatus file = files.next();
+                    LocalResource localResource = Records.newRecord(LocalResource.class);
+
+                    localResource.setType(LocalResourceType.FILE);
+                    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+                    localResource.setResource(ConverterUtils.getYarnUrlFromPath(file.getPath()));
+                    localResource.setTimestamp(file.getModificationTime());
+                    localResource.setSize(file.getLen());
+                    localResources.put(file.getPath().getName(), localResource);
+                }
+                ctx.setLocalResources(localResources);
+
+            } catch (IOException e1) {
+                // TODO Auto-generated catch block
+                e1.printStackTrace();
+            }
+
+            StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+
+            for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                    YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+                classPathEnv.append(':');
+                classPathEnv.append(c.trim());
+            }
+
+            // classPathEnv.append(System.getProperty("java.class.path"));
+            Map<String, String> env = new HashMap<String, String>();
+
+            env.put("CLASSPATH", classPathEnv.toString());
+            ctx.setEnvironment(env);
+
+            // Set the necessary command to execute on the allocated container
+            Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+            vargs.add("java");
+            // TODO add memory parameter
+            // vargs.add("-Xdebug");
+            // vargs.add("-Xrunjdwp:transport=dt_socket,address=8889,server=y");
+            vargs.add("org.apache.s4.core.Main");
+            vargs.add("-zk=" + zkString);
+            vargs.add("-c=" + cluster);
+
+            // add module for fetchings from hdfs
+            vargs.add("-emc=" + HdfsFetcherModule.class.getName());
+
+            // add reference to the configuration
+            if (testMode) {
+                vargs.add("-p=" + FileSystem.FS_DEFAULT_NAME_KEY + "=" + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            }
+
+            // TODO
+            // We should redirect the output to hdfs instead of local logs
+            // so as to be able to look at the final output after the containers
+            // have been released.
+            // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err]
+            vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+            vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+            // Get final commmand
+            StringBuilder command = new StringBuilder();
+            for (CharSequence str : vargs) {
+                command.append(str).append(" ");
+            }
+
+            List<String> commands = new ArrayList<String>();
+            commands.add(command.toString());
+            ctx.setCommands(commands);
+
+            StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+            startReq.setContainerLaunchContext(ctx);
+            try {
+                cm.startContainer(startReq);
+            } catch (YarnRemoteException e) {
+                logger.info("Start container failed for :" + ", containerId=" + container.getId());
+                e.printStackTrace();
+            }
+
+        }
+    }
+
+    /**
+     * Connect to the Resource Manager
+     * 
+     * @return Handle to communicate with the RM
+     */
+    private AMRMProtocol connectToRM() {
+        YarnConfiguration yarnConf = new YarnConfiguration(conf);
+        InetSocketAddress rmAddress = yarnConf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+                YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+        logger.info("Connecting to ResourceManager at " + rmAddress);
+        return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
+    }
+
+    /**
+     * Register the Application Master to the Resource Manager
+     * 
+     * @return the registration response from the RM
+     * @throws YarnRemoteException
+     */
+    private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
+        RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
+
+        // set the required info into the registration request:
+        // application attempt id,
+        // host on which the app master is running
+        // rpc port on which the app master accepts requests from the client
+        // tracking url for the app master
+        appMasterRequest.setApplicationAttemptId(appAttemptID);
+        appMasterRequest.setHost(appMasterHostname);
+        appMasterRequest.setRpcPort(appMasterRpcPort);
+        appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+
+        return resourceManager.registerApplicationMaster(appMasterRequest);
+    }
+
+    /**
+     * Setup the request that will be sent to the RM for the container ask.
+     * 
+     * @param numContainers
+     *            Containers to ask for from RM
+     * @return the setup ResourceRequest to be sent to RM
+     */
+    private ResourceRequest setupContainerAskForRM(int numContainers) {
+        ResourceRequest request = Records.newRecord(ResourceRequest.class);
+
+        // setup requirements for hosts
+        // whether a particular rack/host is needed
+        // Refer to apis under org.apache.hadoop.net for more
+        // details on how to get figure out rack/host mapping.
+        // using * as any host will do for the distributed shell app
+        request.setHostName("*");
+
+        // set no. of containers needed
+        request.setNumContainers(numContainers);
+
+        // set the priority for the request
+        Priority pri = Records.newRecord(Priority.class);
+        // TODO - what is the range for priority? how to decide?
+        pri.setPriority(requestPriority);
+        request.setPriority(pri);
+
+        // Set up resource type requirements
+        // For now, only memory is supported so we set memory requirements
+        Resource capability = Records.newRecord(Resource.class);
+        capability.setMemory(containerMemory);
+        request.setCapability(capability);
+
+        return request;
+    }
+
+    /**
+     * Ask RM to allocate given no. of containers to this Application Master
+     * 
+     * @param requestedContainers
+     *            Containers to ask for from RM
+     * @return Response from RM to AM with allocated containers
+     * @throws YarnRemoteException
+     */
+    private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers) throws YarnRemoteException {
+        AllocateRequest req = Records.newRecord(AllocateRequest.class);
+        req.setResponseId(rmRequestID.incrementAndGet());
+        req.setApplicationAttemptId(appAttemptID);
+        req.addAllAsks(requestedContainers);
+        req.addAllReleases(releasedContainers);
+        req.setProgress((float) numCompletedContainers.get() / numTotalContainers);
+
+        logger.info("Sending request to RM for containers" + ", requestedSet=" + requestedContainers.size()
+                + ", releasedSet=" + releasedContainers.size() + ", progress=" + req.getProgress());
+
+        for (ResourceRequest rsrcReq : requestedContainers) {
+            logger.info("Requested container ask: " + rsrcReq.toString());
+        }
+        for (ContainerId id : releasedContainers) {
+            logger.info("Released container, id=" + id.getId());
+        }
+
+        AllocateResponse resp = resourceManager.allocate(req);
+        return resp.getAMResponse();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
new file mode 100644
index 0000000..3a3aac6
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools.yarn;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.s4.tools.DefineCluster;
+import org.apache.s4.tools.Deploy;
+import org.apache.s4.tools.Tools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Client for S4 application submission to YARN.
+ * 
+ * <p>
+ * It connects to an existing YARN infrastructure (based on configuration files in HADOOP_CONF_DIR environment
+ * variables), then launches an S4 application master, passing all relevant configuration. The application master then
+ * asks resources from the resource manager and creates S4 nodes on those allocated resources, with the appropriate
+ * configuration.
+ * </p>
+ * 
+ * <p>
+ * <u>Implementation notes from YARN example: </u>
+ * </p>
+ * <p>
+ * To submit an application, a client first needs to connect to the <code>ResourceManager</code> aka ApplicationsManager
+ * or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol} provides a way for the client to get access to
+ * cluster information and to request for a new {@link ApplicationId}.
+ * <p>
+ * 
+ * <p>
+ * For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. The
+ * {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} and application
+ * name, user submitting the application, the priority assigned to the application and the queue to which this
+ * application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} also defines the
+ * {@link ContainerLaunchContext} which describes the <code>Container</code> with which the {@link S4ApplicationMaster}
+ * is launched.
+ * </p>
+ * 
+ * <p>
+ * The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
+ * {@link S4ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available and the
+ * environment to be set for the {@link S4ApplicationMaster} and the commands to be executed to run the
+ * {@link S4ApplicationMaster}.
+ * <p>
+ * 
+ * <p>
+ * Using the {@link ApplicationSubmissionContext}, the client submits the application to the
+ * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> for an
+ * {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client kills the
+ * application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>.
+ * </p>
+ * 
+ */
+public class S4YarnClient extends YarnClientImpl {
+
+    private static final String HADOOP_CONF_DIR_ENV = System.getenv("HADOOP_CONF_DIR");
+
+    private static final ImmutableSet<String> YARN_CONF_FILES = ImmutableSet.of("core-site.xml", "hdfs-site.xml",
+            "yarn-site.xml", "mapred-site.xml");
+
+    private static Logger logger = LoggerFactory.getLogger(S4YarnClient.class);
+
+    // Configuration
+    private Configuration conf;
+
+    YarnArgs yarnArgs;
+
+    // Handle to talk to the Resource Manager/Applications Manager
+    private ClientRMProtocol applicationsManager;
+
+    private int amMemory;
+
+    private final long clientStartTime = System.currentTimeMillis();
+
+    /**
+     * @param args
+     *            Command line arguments
+     */
+    public static void main(String[] args) {
+
+        YarnArgs yarnArgs = new YarnArgs();
+        logger.info("S4YarnClient args = " + Arrays.toString(args));
+
+        Tools.parseArgs(yarnArgs, args);
+        boolean result = false;
+        try {
+
+            YarnConfiguration yarnConfig = new YarnConfiguration();
+
+            if (Strings.isNullOrEmpty(HADOOP_CONF_DIR_ENV)) {
+                logger.error("You must define HADOOP_CONF_DIR environment variable");
+                System.exit(1);
+            }
+            File confDir = new File(HADOOP_CONF_DIR_ENV);
+            if (!(confDir.listFiles(new FileFilter() {
+
+                @Override
+                public boolean accept(File pathname) {
+                    return YARN_CONF_FILES.contains(pathname.getName());
+                }
+            }).length == 4)) {
+                logger.error("The {} directory must contain files [core,hdfs,yarn,mapred]-site.xml");
+                System.exit(1);
+            }
+
+            for (String fileName : YARN_CONF_FILES) {
+                yarnConfig.addResource(new Path(new File(HADOOP_CONF_DIR_ENV, fileName).toURI()));
+            }
+
+            S4YarnClient client = new S4YarnClient(yarnArgs, yarnConfig);
+            result = client.run(false);
+        } catch (Throwable t) {
+            logger.error("Error running Client", t);
+            System.exit(1);
+        }
+        if (result) {
+            logger.info("Application completed successfully");
+            System.exit(0);
+        }
+        logger.error("Application failed to complete successfully");
+        System.exit(1);
+    }
+
+    public S4YarnClient(YarnArgs yarnArgs, Configuration conf) throws Exception {
+        this.yarnArgs = yarnArgs;
+        this.conf = conf;
+        init(this.conf);
+    }
+
+    /**
+     * Main run function for the client
+     * 
+     * @return true if application completed successfully
+     * @throws IOException
+     */
+    public boolean run(boolean testMode) throws IOException {
+        logger.info("Running Client");
+        start();
+
+        YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
+        logger.info("Got Cluster metric info from ASM" + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
+
+        List<NodeReport> clusterNodeReports = super.getNodeReports();
+        logger.info("Got Cluster node info from ASM");
+        for (NodeReport node : clusterNodeReports) {
+            logger.info("Got node report from ASM for" + ", nodeId=" + node.getNodeId() + ", nodeAddress"
+                    + node.getHttpAddress() + ", nodeRackName" + node.getRackName() + ", nodeNumContainers"
+                    + node.getNumContainers() + ", nodeHealthStatus" + node.getNodeHealthStatus());
+        }
+
+        QueueInfo queueInfo = super.getQueueInfo(yarnArgs.queue);
+        logger.info("Queue info" + ", queueName=" + queueInfo.getQueueName() + ", queueCurrentCapacity="
+                + queueInfo.getCurrentCapacity() + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+                + ", queueApplicationCount=" + queueInfo.getApplications().size() + ", queueChildQueueCount="
+                + queueInfo.getChildQueues().size());
+
+        List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();
+        for (QueueUserACLInfo aclInfo : listAclInfo) {
+            for (QueueACL userAcl : aclInfo.getUserAcls()) {
+                logger.info("User ACL Info for Queue" + ", queueName=" + aclInfo.getQueueName() + ", userAcl="
+                        + userAcl.name());
+            }
+        }
+
+        // Get a new application id
+        GetNewApplicationResponse newApp = super.getNewApplication();
+        ApplicationId appId = newApp.getApplicationId();
+
+        // TODO get min/max resource capabilities from RM and change memory ask if needed
+        // If we do not have min/max, we may not be able to correctly request
+        // the required resources from the RM for the app master
+        // Memory ask has to be a multiple of min and less than max.
+        // Dump out information about cluster capability as seen by the resource manager
+        int minMem = newApp.getMinimumResourceCapability().getMemory();
+        int maxMem = newApp.getMaximumResourceCapability().getMemory();
+        logger.info("Min mem capability of resources in this cluster " + minMem);
+        logger.info("Max mem capability of resources in this cluster " + maxMem);
+
+        // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
+        // a multiple of the min value and cannot exceed the max.
+        // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
+        if (amMemory < minMem) {
+            logger.info("AM memory specified below min threshold of cluster. Using min value." + ", specified="
+                    + amMemory + ", min=" + minMem);
+            amMemory = minMem;
+        } else if (amMemory > maxMem) {
+            logger.info("AM memory specified above max threshold of cluster. Using max value." + ", specified="
+                    + amMemory + ", max=" + maxMem);
+            amMemory = maxMem;
+        }
+
+        // Create launch context for app master
+        logger.info("Setting up application submission context for ASM");
+        ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+        // set the application id
+        appContext.setApplicationId(appId);
+        // set the application name
+        appContext.setApplicationName(yarnArgs.appName);
+
+        // Set up the container launch context for the application master
+        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+        // set local resources for the application master
+        // local files or archives as needed
+        // In this scenario, the jar file for the application master is part of the local resources
+        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+        if (!(new File(yarnArgs.s4Dir).isDirectory())) {
+            logger.error("Invalid s4 directory : " + yarnArgs.s4Dir);
+            System.exit(1);
+        }
+
+        // TODO avoid depending on source distribution paths
+        File[] classPathFiles = new File(yarnArgs.s4Dir + "/subprojects/s4-yarn/build/install/s4-yarn/lib").listFiles();
+        FileSystem fs = FileSystem.get(conf);
+
+        for (int i = 0; i < classPathFiles.length; i++) {
+            Path dest = copyToLocalResources(fs, localResources, classPathFiles[i]);
+            logger.info("Copied classpath resource " + classPathFiles[i].getAbsolutePath() + " to "
+                    + dest.toUri().toString());
+        }
+
+        // Set local resource info into app master container launch context
+        amContainer.setLocalResources(localResources);
+
+        // Set the necessary security tokens as needed
+        // amContainer.setContainerTokens(containerToken);
+
+        // Set the env variables to be setup in the env where the application master will be run
+        logger.info("Set the environment for the application master");
+        Map<String, String> env = new HashMap<String, String>();
+
+        // For now setting all required classpaths including
+        // the classpath to "." for the application jar
+        StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+        for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+            classPathEnv.append(':');
+            classPathEnv.append(c.trim());
+        }
+        classPathEnv.append(":./log4j.properties");
+
+        env.put("CLASSPATH", classPathEnv.toString());
+
+        amContainer.setEnvironment(env);
+
+        // Set the necessary command to execute the application master
+        Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+        // Set java executable command
+        logger.info("Setting up app master command");
+
+        // vargs.add("${JAVA_HOME}" + "/bin/java");
+        vargs.add("java");
+        // Set Xmx based on am memory size
+        vargs.add("-Xmx" + amMemory + "m");
+        // vargs.add("-Xdebug");
+        // vargs.add("-Xrunjdwp:transport=dt_socket,address=8888,server=y");
+        // Set Application Master class name
+        vargs.add(S4ApplicationMaster.class.getName());
+        // Set params for Application Master
+        vargs.add("--container_memory " + String.valueOf(yarnArgs.containerMemory));
+        vargs.add("--num_containers " + String.valueOf(yarnArgs.numContainers));
+        vargs.add("--priority " + String.valueOf(yarnArgs.priority));
+        vargs.add("-c " + String.valueOf(yarnArgs.cluster));
+        vargs.add("-zk " + String.valueOf(yarnArgs.zkString));
+        if (testMode) {
+            vargs.add("-test");
+        }
+
+        if (yarnArgs.debug) {
+            vargs.add("--debug");
+        }
+
+        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+        // Get final commmand
+        StringBuilder command = new StringBuilder();
+        for (CharSequence str : vargs) {
+            command.append(str).append(" ");
+        }
+
+        logger.info("Completed setting up app master command " + command.toString());
+        List<String> commands = new ArrayList<String>();
+        commands.add(command.toString());
+        amContainer.setCommands(commands);
+
+        // Set up resource type requirements
+        // For now, only memory is supported so we set memory requirements
+        Resource capability = Records.newRecord(Resource.class);
+        capability.setMemory(amMemory);
+        amContainer.setResource(capability);
+
+        appContext.setAMContainerSpec(amContainer);
+
+        // Set the priority for the application master
+        Priority pri = Records.newRecord(Priority.class);
+        // TODO - what is the range for priority? how to decide?
+        pri.setPriority(yarnArgs.priority);
+        appContext.setPriority(pri);
+
+        // Set the queue to which this application is to be submitted in the RM
+        appContext.setQueue(yarnArgs.queue);
+        // Set the user submitting this application
+        // TODO can it be empty?
+        appContext.setUser(yarnArgs.user);
+
+        // Define a new cluster for the application
+        String[] defineClusterArgs = { "-cluster=" + yarnArgs.cluster, "-nbTasks=" + yarnArgs.nbTasks,
+                "-flp=" + yarnArgs.flp, "-zk=" + yarnArgs.zkString };
+        DefineCluster.main(defineClusterArgs);
+
+        // Deply the application to the new cluster
+        String[] deployApplicationArgs = { "-s4r=" + yarnArgs.s4rPath, "-cluster=" + yarnArgs.cluster,
+                "-appName=" + yarnArgs.cluster, "-shutdown=false", "-zk=" + yarnArgs.zkString };
+        Deploy.main(deployApplicationArgs);
+
+        logger.info("Submitting application to ASM");
+        super.submitApplication(appContext);
+
+        // TODO
+        // Try submitting the same request again
+        // app submission failure?
+
+        // Monitor the application (TODO: optional?)
+
+        return monitorApplication(appId);
+
+    }
+
+    private Path copyToLocalResources(FileSystem fs, Map<String, LocalResource> localResources, File file)
+            throws IOException {
+        Path src = new Path(file.getAbsolutePath());
+
+        // TODO use home directory + appId / appName?
+        Path dst = new Path(fs.getHomeDirectory(), file.getName());
+        fs.copyFromLocalFile(false, true, src, dst);
+        FileStatus destStatus = fs.getFileStatus(dst);
+        LocalResource resource = Records.newRecord(LocalResource.class);
+        resource.setType(LocalResourceType.FILE);
+        resource.setVisibility(LocalResourceVisibility.APPLICATION);
+        resource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+        // Set timestamp and length of file so that the framework
+        // can do basic sanity checks for the local resource
+        // after it has been copied over to ensure it is the same
+        // resource the client intended to use with the application
+        resource.setTimestamp(destStatus.getModificationTime());
+        resource.setSize(destStatus.getLen());
+        localResources.put(file.getName(), resource);
+        return dst;
+    }
+
+    /**
+     * Monitor the submitted application for completion. Kill application if time expires.
+     * 
+     * @param appId
+     *            Application Id of application to be monitored
+     * @return true if application completed successfully
+     * @throws YarnRemoteException
+     */
+    private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
+
+        while (true) {
+
+            // Check app status every 1 second.
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                logger.debug("Thread sleep in monitoring loop interrupted");
+            }
+
+            // Get application report for the appId we are interested in
+            ApplicationReport report = super.getApplicationReport(appId);
+
+            logger.info("Got application report from ASM for" + ", appId=" + appId.getId() + ", clientToken="
+                    + report.getClientToken() + ", appDiagnostics=" + report.getDiagnostics() + ", appMasterHost="
+                    + report.getHost() + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+                    + report.getRpcPort() + ", appStartTime=" + report.getStartTime() + ", yarnAppState="
+                    + report.getYarnApplicationState().toString() + ", distributedFinalState="
+                    + report.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + report.getTrackingUrl()
+                    + ", appUser=" + report.getUser());
+
+            YarnApplicationState state = report.getYarnApplicationState();
+            FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+            if (YarnApplicationState.FINISHED == state) {
+                if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+                    logger.info("Application has completed successfully. Breaking monitoring loop");
+                    return true;
+                } else {
+                    logger.info("Application did finished unsuccessfully." + " YarnState=" + state.toString()
+                            + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+                    return false;
+                }
+            } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
+                logger.info("Application did not finish." + " YarnState=" + state.toString() + ", DSFinalStatus="
+                        + dsStatus.toString() + ". Breaking monitoring loop");
+                return false;
+            }
+
+            if ((yarnArgs.timeout != -1) && (System.currentTimeMillis() > (clientStartTime + yarnArgs.timeout))) {
+                logger.info("Reached client specified timeout for application. Killing application");
+                forceKillApplication(appId);
+                return false;
+            }
+        }
+
+    }
+
+    /**
+     * Kill a submitted application by sending a call to the Applications Manager
+     * 
+     * @param appId
+     *            Application Id to be killed.
+     * @throws YarnRemoteException
+     */
+    private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
+        super.killApplication(appId);
+    }
+
+}