You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/01/07 23:30:54 UTC

[2/2] git commit: Adding support to use YARN for provisioning

Adding support to use YARN for provisioning


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/d23198c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/d23198c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/d23198c7

Branch: refs/heads/helix-provisioning
Commit: d23198c760e1d87f5d4a3c19d2ec57bdf4d3dfb0
Parents: 3153c5e
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Tue Jan 7 14:30:45 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Tue Jan 7 14:30:45 2014 -0800

----------------------------------------------------------------------
 .../controller/provisioner/ContainerSpec.java   |   2 +
 helix-provisioning/.gitignore                   |  16 +
 helix-provisioning/DISCLAIMER                   |  15 +
 helix-provisioning/LICENSE                      | 273 ++++++
 helix-provisioning/NOTICE                       |  30 +
 helix-provisioning/pom.xml                      | 116 +++
 helix-provisioning/src/assemble/assembly.xml    |  60 ++
 .../src/main/config/log4j.properties            |  31 +
 .../provisioning/yarn/ApplicationMaster.java    | 889 +++++++++++++++++++
 .../provisioning/yarn/ApplicationSpec.java      |  19 +
 .../apache/helix/provisioning/yarn/Client.java  | 692 +++++++++++++++
 .../provisioning/yarn/ContainerAskResponse.java |  17 +
 .../yarn/ContainerLaunchResponse.java           |   5 +
 .../yarn/ContainerReleaseResponse.java          |   5 +
 .../yarn/ContainerStopResponse.java             |   5 +
 .../helix/provisioning/yarn/DSConstants.java    |  47 +
 .../yarn/GenericApplicationMaster.java          | 306 +++++++
 .../yarn/LaunchContainerRunnable.java           |  79 ++
 .../provisioning/yarn/NMCallbackHandler.java    |  74 ++
 .../provisioning/yarn/RMCallbackHandler.java    | 109 +++
 .../yarn/YarnApplicationMaster.java             |  16 +
 .../provisioning/yarn/YarnProvisioner.java      | 120 +++
 helix-provisioning/src/test/conf/testng.xml     |  27 +
 pom.xml                                         |   1 +
 24 files changed, 2954 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
index e814059..b393a64 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
@@ -24,6 +24,8 @@ public class ContainerSpec {
    * Some unique id representing the container.
    */
   ContainerId containerId;
+  
+  String memory;
 
   public ContainerSpec(ContainerId containerId) {
     this.containerId = containerId;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/.gitignore
----------------------------------------------------------------------
diff --git a/helix-provisioning/.gitignore b/helix-provisioning/.gitignore
new file mode 100644
index 0000000..2411bd8
--- /dev/null
+++ b/helix-provisioning/.gitignore
@@ -0,0 +1,16 @@
+/target
+/.project
+/.classpath
+/.settings
+/zkdata
+/test-output
+/src/main/scripts/integration-test/var
+#/src/test/java/com/linkedin/dds/
+#/src/main/scripts/integration-test/config
+/src/main/scripts/target/
+/src/main/scripts/integration-test/script/.metadata_infra
+#/src/main/scripts/integration-test/script/dds_driver.py
+#/src/main/scripts/integration-test/script/pexpect.py
+#/src/main/scripts/integration-test/script/utility.py
+*.pyc
+/bin

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/DISCLAIMER
----------------------------------------------------------------------
diff --git a/helix-provisioning/DISCLAIMER b/helix-provisioning/DISCLAIMER
new file mode 100644
index 0000000..2001d31
--- /dev/null
+++ b/helix-provisioning/DISCLAIMER
@@ -0,0 +1,15 @@
+Apache Helix is an effort undergoing incubation at the Apache Software 
+Foundation (ASF), sponsored by the Apache Incubator PMC. 
+
+Incubation is required of all newly accepted projects until a further review 
+indicates that the infrastructure, communications, and decision making process 
+have stabilized in a manner consistent with other successful ASF projects. 
+
+While incubation status is not necessarily a reflection of the completeness 
+or stability of the code, it does indicate that the project has yet to be 
+fully endorsed by the ASF.
+
+For more information about the incubation status of the Apache Helix project you
+can go to the following page:
+
+http://incubator.apache.org/projects/helix.html

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/LICENSE
----------------------------------------------------------------------
diff --git a/helix-provisioning/LICENSE b/helix-provisioning/LICENSE
new file mode 100644
index 0000000..413913f
--- /dev/null
+++ b/helix-provisioning/LICENSE
@@ -0,0 +1,273 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+
+For xstream:
+
+Copyright (c) 2003-2006, Joe Walnes
+Copyright (c) 2006-2009, 2011 XStream Committers
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or other materials provided
+with the distribution.
+
+3. Neither the name of XStream nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+for jline:
+
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/NOTICE
----------------------------------------------------------------------
diff --git a/helix-provisioning/NOTICE b/helix-provisioning/NOTICE
new file mode 100644
index 0000000..e070e15
--- /dev/null
+++ b/helix-provisioning/NOTICE
@@ -0,0 +1,30 @@
+Apache Helix
+Copyright 2012 The Apache Software Foundation
+
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+Codehaus (http://www.codehaus.org/)
+Licensed under the BSD License.
+
+This product includes software developed at
+jline (http://jline.sourceforge.net/ )
+Licensed under the BSD License.
+
+This product includes software developed at
+josql (http://sourceforge.net/projects/josql/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+restlet (http://www.restlet.org/about/legal).
+Licensed under the Apache License 2.0.
+
+
+II. License Summary
+- Apache License 2.0
+- BSD License

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml
new file mode 100644
index 0000000..1f5a585
--- /dev/null
+++ b/helix-provisioning/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.helix</groupId>
+    <artifactId>helix</artifactId>
+    <version>0.7.1-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>helix-provisioning</artifactId>
+  <packaging>bundle</packaging>
+  <name>Apache Helix :: HelixAgent</name>
+
+  <properties>
+    <hadoop.version>2.2.0</hadoop.version>
+    <osgi.import>
+      org.apache.helix*,
+      org.apache.commons.cli;version="[1.2,2)",
+      org.apache.log4j,
+      *
+    </osgi.import>
+    <osgi.export>org.apache.helix.provisioning.yarn*;version="${project.version};-noimport:=true</osgi.export>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+    </dependency>
+   <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <configuration>
+          <programs>
+            <program>
+              <mainClass>org.apache.helix.provisioning.HelixAgentMain</mainClass>
+              <name>start-helix-provisioning</name>
+            </program>
+          </programs>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/assemble/assembly.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/assemble/assembly.xml b/helix-provisioning/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/helix-provisioning/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+  <id>pkg</id>
+  <formats>
+    <format>tar</format>
+  </formats>
+  <fileSets>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+      <outputDirectory>repo</outputDirectory>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+      <excludes>
+        <exclude>**/*.xml</exclude>
+      </excludes>
+    </fileSet>
+     <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>LICENSE</include>
+        <include>NOTICE</include>
+        <include>DISCLAIMER</include>
+      </includes>
+      <fileMode>0755</fileMode>
+    </fileSet>
+  </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/config/log4j.properties b/helix-provisioning/src/main/config/log4j.properties
new file mode 100644
index 0000000..91fac03
--- /dev/null
+++ b/helix-provisioning/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
new file mode 100644
index 0000000..d63b300
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
@@ -0,0 +1,889 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.provisioning.yarn;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * An ApplicationMaster for executing shell commands on a set of launched
+ * containers using the YARN framework.
+ * 
+ * <p>
+ * This class is meant to act as an example on how to write yarn-based
+ * application masters.
+ * </p>
+ * 
+ * <p>
+ * The ApplicationMaster is started on a container by the
+ * <code>ResourceManager</code>'s launcher. The first thing that the
+ * <code>ApplicationMaster</code> needs to do is to connect and register itself
+ * with the <code>ResourceManager</code>. The registration sets up information
+ * within the <code>ResourceManager</code> regarding what host:port the
+ * ApplicationMaster is listening on to provide any form of functionality to a
+ * client as well as a tracking url that a client can use to keep track of
+ * status/job history if needed. However, in the distributedshell, trackingurl
+ * and appMasterHost:appMasterRpcPort are not supported.
+ * </p>
+ * 
+ * <p>
+ * The <code>ApplicationMaster</code> needs to send a heartbeat to the
+ * <code>ResourceManager</code> at regular intervals to inform the
+ * <code>ResourceManager</code> that it is up and alive. The
+ * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
+ * <code>ApplicationMaster</code> acts as a heartbeat.
+ * 
+ * <p>
+ * For the actual handling of the job, the <code>ApplicationMaster</code> has to
+ * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
+ * required no. of containers using {@link ResourceRequest} with the necessary
+ * resource specifications such as node location, computational
+ * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
+ * responds with an {@link AllocateResponse} that informs the
+ * <code>ApplicationMaster</code> of the set of newly allocated containers,
+ * completed containers as well as current state of available resources.
+ * </p>
+ * 
+ * <p>
+ * For each allocated container, the <code>ApplicationMaster</code> can then set
+ * up the necessary launch context via {@link ContainerLaunchContext} to specify
+ * the allocated container id, local resources required by the executable, the
+ * environment to be setup for the executable, commands to execute, etc. and
+ * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
+ * launch and execute the defined commands on the given allocated container.
+ * </p>
+ * 
+ * <p>
+ * The <code>ApplicationMaster</code> can monitor the launched container by
+ * either querying the <code>ResourceManager</code> using
+ * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
+ * the {@link ContainerManagementProtocol} by querying for the status of the allocated
+ * container's {@link ContainerId}.
+ *
+ * <p>
+ * After the job has been completed, the <code>ApplicationMaster</code> has to
+ * send a {@link FinishApplicationMasterRequest} to the
+ * <code>ResourceManager</code> to inform it that the
+ * <code>ApplicationMaster</code> has been completed.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ApplicationMaster {
+
+  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+  // Configuration
+  private Configuration conf;
+
+  // Handle to communicate with the Resource Manager
+  @SuppressWarnings("rawtypes")
+  private AMRMClientAsync amRMClient;
+
+  // Handle to communicate with the Node Manager
+  private NMClientAsync nmClientAsync;
+  // Listen to process the response from the Node Manager
+  private NMCallbackHandler containerListener;
+  
+  // Application Attempt Id ( combination of attemptId and fail count )
+  private ApplicationAttemptId appAttemptID;
+
+  // TODO
+  // For status update for clients - yet to be implemented
+  // Hostname of the container
+  private String appMasterHostname = "";
+  // Port on which the app master listens for status updates from clients
+  private int appMasterRpcPort = -1;
+  // Tracking url to which app master publishes info for clients to monitor
+  private String appMasterTrackingUrl = "";
+
+  // App Master configuration
+  // No. of containers to run shell command on
+  private int numTotalContainers = 1;
+  // Memory to request for the container on which the shell command will run
+  private int containerMemory = 10;
+  // Priority of the request
+  private int requestPriority;
+
+  // Counter for completed containers ( complete denotes successful or failed )
+  private AtomicInteger numCompletedContainers = new AtomicInteger();
+  // Allocated container count so that we know how many containers has the RM
+  // allocated to us
+  private AtomicInteger numAllocatedContainers = new AtomicInteger();
+  // Count of failed containers
+  private AtomicInteger numFailedContainers = new AtomicInteger();
+  // Count of containers already requested from the RM
+  // Needed as once requested, we should not request for containers again.
+  // Only request for more if the original requirement changes.
+  private AtomicInteger numRequestedContainers = new AtomicInteger();
+
+  // Shell command to be executed
+  private String shellCommand = "";
+  // Args to be passed to the shell command
+  private String shellArgs = "";
+  // Env variables to be setup for the shell command
+  private Map<String, String> shellEnv = new HashMap<String, String>();
+
+  // Location of shell script ( obtained from info set in env )
+  // Shell script path in fs
+  private String shellScriptPath = "";
+  // Timestamp needed for creating a local resource
+  private long shellScriptPathTimestamp = 0;
+  // File length needed for local resource
+  private long shellScriptPathLen = 0;
+
+  // Hardcoded path to shell script in launch container's local env
+  private final String ExecShellStringPath = "ExecShellScript.sh";
+
+  private volatile boolean done;
+  private volatile boolean success;
+
+  private ByteBuffer allTokens;
+
+  // Launch threads
+  private List<Thread> launchThreads = new ArrayList<Thread>();
+
+  /**
+   * @param args Command line args
+   */
+  public static void main(String[] args) {
+    boolean result = false;
+    try {
+      ApplicationMaster appMaster = new ApplicationMaster();
+      LOG.info("Initializing ApplicationMaster");
+      LOG.info("classpath:" + System.getProperty("java.class.path"));
+      boolean doRun = appMaster.init(args);
+      if (!doRun) {
+        System.exit(0);
+      }
+      result = appMaster.run();
+    } catch (Throwable t) {
+      LOG.fatal("Error running ApplicationMaster", t);
+      System.exit(1);
+    }
+    if (result) {
+      LOG.info("Application Master completed successfully. exiting");
+      System.exit(0);
+    } else {
+      LOG.info("Application Master failed. exiting");
+      System.exit(2);
+    }
+  }
+
+  /**
+   * Dump out contents of $CWD and the environment to stdout for debugging
+   */
+  private void dumpOutDebugInfo() {
+
+    LOG.info("Dump debug output");
+    Map<String, String> envs = System.getenv();
+    for (Map.Entry<String, String> env : envs.entrySet()) {
+      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
+      System.out.println("System env: key=" + env.getKey() + ", val="
+          + env.getValue());
+    }
+
+    String cmd = "ls -al";
+    Runtime run = Runtime.getRuntime();
+    Process pr = null;
+    try {
+      pr = run.exec(cmd);
+      pr.waitFor();
+
+      BufferedReader buf = new BufferedReader(new InputStreamReader(
+          pr.getInputStream()));
+      String line = "";
+      while ((line = buf.readLine()) != null) {
+        LOG.info("System CWD content: " + line);
+        System.out.println("System CWD content: " + line);
+      }
+      buf.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public ApplicationMaster() {
+    // Set up the configuration
+    conf = new YarnConfiguration();
+  }
+
+  /**
+   * Parse command line options
+   *
+   * @param args Command line args
+   * @return Whether init successful and run should be invoked
+   * @throws ParseException
+   * @throws IOException
+   */
+  public boolean init(String[] args) throws ParseException, IOException {
+
+    Options opts = new Options();
+    opts.addOption("app_attempt_id", true,
+        "App Attempt ID. Not to be used unless for testing purposes");
+    opts.addOption("shell_command", true,
+        "Shell command to be executed by the Application Master");
+    opts.addOption("shell_script", true,
+        "Location of the shell script to be executed");
+    opts.addOption("shell_args", true, "Command line args for the shell script");
+    opts.addOption("shell_env", true,
+        "Environment for shell script. Specified as env_key=env_val pairs");
+    opts.addOption("container_memory", true,
+        "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("num_containers", true,
+        "No. of containers on which the shell command needs to be executed");
+    opts.addOption("priority", true, "Application Priority. Default 0");
+    opts.addOption("debug", false, "Dump out debug information");
+
+    opts.addOption("help", false, "Print usage");
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+
+    if (args.length == 0) {
+      printUsage(opts);
+      throw new IllegalArgumentException(
+          "No args specified for application master to initialize");
+    }
+
+    if (cliParser.hasOption("help")) {
+      printUsage(opts);
+      return false;
+    }
+
+    if (cliParser.hasOption("debug")) {
+      dumpOutDebugInfo();
+    }
+
+    Map<String, String> envs = System.getenv();
+
+    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
+      if (cliParser.hasOption("app_attempt_id")) {
+        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
+        appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+      } else {
+        throw new IllegalArgumentException(
+            "Application Attempt Id not set in the environment");
+      }
+    } else {
+      ContainerId containerId = ConverterUtils.toContainerId(envs
+          .get(Environment.CONTAINER_ID.name()));
+      appAttemptID = containerId.getApplicationAttemptId();
+    }
+
+    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
+      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+          + " not set in the environment");
+    }
+    if (!envs.containsKey(Environment.NM_HOST.name())) {
+      throw new RuntimeException(Environment.NM_HOST.name()
+          + " not set in the environment");
+    }
+    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
+      throw new RuntimeException(Environment.NM_HTTP_PORT
+          + " not set in the environment");
+    }
+    if (!envs.containsKey(Environment.NM_PORT.name())) {
+      throw new RuntimeException(Environment.NM_PORT.name()
+          + " not set in the environment");
+    }
+
+    LOG.info("Application master for app" + ", appId="
+        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
+        + appAttemptID.getApplicationId().getClusterTimestamp()
+        + ", attemptId=" + appAttemptID.getAttemptId());
+
+    if (!cliParser.hasOption("shell_command")) {
+      throw new IllegalArgumentException(
+          "No shell command specified to be executed by application master");
+    }
+    shellCommand = cliParser.getOptionValue("shell_command");
+
+    if (cliParser.hasOption("shell_args")) {
+      shellArgs = cliParser.getOptionValue("shell_args");
+    }
+    if (cliParser.hasOption("shell_env")) {
+      String shellEnvs[] = cliParser.getOptionValues("shell_env");
+      for (String env : shellEnvs) {
+        env = env.trim();
+        int index = env.indexOf('=');
+        if (index == -1) {
+          shellEnv.put(env, "");
+          continue;
+        }
+        String key = env.substring(0, index);
+        String val = "";
+        if (index < (env.length() - 1)) {
+          val = env.substring(index + 1);
+        }
+        shellEnv.put(key, val);
+      }
+    }
+
+    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
+      shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
+
+      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
+        shellScriptPathTimestamp = Long.valueOf(envs
+            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
+      }
+      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
+        shellScriptPathLen = Long.valueOf(envs
+            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
+      }
+
+      if (!shellScriptPath.isEmpty()
+          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
+        LOG.error("Illegal values in env for shell script path" + ", path="
+            + shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
+            + shellScriptPathTimestamp);
+        throw new IllegalArgumentException(
+            "Illegal values in env for shell script path");
+      }
+    }
+
+    containerMemory = Integer.parseInt(cliParser.getOptionValue(
+        "container_memory", "10"));
+    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
+        "num_containers", "1"));
+    if (numTotalContainers == 0) {
+      throw new IllegalArgumentException(
+          "Cannot run distributed shell with no containers");
+    }
+    requestPriority = Integer.parseInt(cliParser
+        .getOptionValue("priority", "0"));
+
+    return true;
+  }
+
+  /**
+   * Helper function to print usage
+   *
+   * @param opts Parsed command line options
+   */
+  private void printUsage(Options opts) {
+    new HelpFormatter().printHelp("ApplicationMaster", opts);
+  }
+
+  /**
+   * Main run function for the application master
+   *
+   * @throws YarnException
+   * @throws IOException
+   */
+  @SuppressWarnings({ "unchecked" })
+  public boolean run() throws YarnException, IOException {
+    LOG.info("Starting ApplicationMaster");
+
+    Credentials credentials =
+        UserGroupInformation.getCurrentUser().getCredentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    // Now remove the AM->RM token so that containers cannot access it.
+    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+    amRMClient.init(conf);
+    amRMClient.start();
+
+    containerListener = createNMCallbackHandler();
+    nmClientAsync = new NMClientAsyncImpl(containerListener);
+    nmClientAsync.init(conf);
+    nmClientAsync.start();
+
+    // Setup local RPC Server to accept status requests directly from clients
+    // TODO need to setup a protocol for client to be able to communicate to
+    // the RPC server
+    // TODO use the rpc port info to register with the RM for the client to
+    // send requests to this app master
+
+    // Register self with ResourceManager
+    // This will start heartbeating to the RM
+    appMasterHostname = NetUtils.getHostname();
+    RegisterApplicationMasterResponse response = amRMClient
+        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+            appMasterTrackingUrl);
+    // Dump out information about cluster capability as seen by the
+    // resource manager
+    int maxMem = response.getMaximumResourceCapability().getMemory();
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    // A resource ask cannot exceed the max.
+    if (containerMemory > maxMem) {
+      LOG.info("Container memory specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerMemory + ", max="
+          + maxMem);
+      containerMemory = maxMem;
+    }
+
+    // Setup ask for containers from RM
+    // Send request for containers to RM
+    // Until we get our fully allocated quota, we keep on polling RM for
+    // containers
+    // Keep looping until all the containers are launched and shell script
+    // executed on them ( regardless of success/failure).
+    for (int i = 0; i < numTotalContainers; ++i) {
+      ContainerRequest containerAsk = setupContainerAskForRM();
+      amRMClient.addContainerRequest(containerAsk);
+    }
+    numRequestedContainers.set(numTotalContainers);
+
+    while (!done
+        && (numCompletedContainers.get() != numTotalContainers)) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException ex) {}
+    }
+    finish();
+    
+    return success;
+  }
+
+  @VisibleForTesting
+  NMCallbackHandler createNMCallbackHandler() {
+    return new NMCallbackHandler(this);
+  }
+
+  private void finish() {
+    // Join all launched threads
+    // needed for when we time out
+    // and we need to release containers
+    for (Thread launchThread : launchThreads) {
+      try {
+        launchThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Exception thrown in thread join: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+
+    // When the application completes, it should stop all running containers
+    LOG.info("Application completed. Stopping running containers");
+    nmClientAsync.stop();
+
+    // When the application completes, it should send a finish application
+    // signal to the RM
+    LOG.info("Application completed. Signalling finish to RM");
+
+    FinalApplicationStatus appStatus;
+    String appMessage = null;
+    success = true;
+    if (numFailedContainers.get() == 0 && 
+        numCompletedContainers.get() == numTotalContainers) {
+      appStatus = FinalApplicationStatus.SUCCEEDED;
+    } else {
+      appStatus = FinalApplicationStatus.FAILED;
+      appMessage = "Diagnostics." + ", total=" + numTotalContainers
+          + ", completed=" + numCompletedContainers.get() + ", allocated="
+          + numAllocatedContainers.get() + ", failed="
+          + numFailedContainers.get();
+      success = false;
+    }
+    try {
+      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+    } catch (YarnException ex) {
+      LOG.error("Failed to unregister application", ex);
+    } catch (IOException e) {
+      LOG.error("Failed to unregister application", e);
+    }
+    
+    amRMClient.stop();
+  }
+  
+  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+      LOG.info("Got response from RM for container ask, completedCnt="
+          + completedContainers.size());
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info("Got container status for containerID="
+            + containerStatus.getContainerId() + ", state="
+            + containerStatus.getState() + ", exitStatus="
+            + containerStatus.getExitStatus() + ", diagnostics="
+            + containerStatus.getDiagnostics());
+
+        // non complete containers should not be here
+        assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+        // increment counters for completed/failed containers
+        int exitStatus = containerStatus.getExitStatus();
+        if (0 != exitStatus) {
+          // container failed
+          if (ContainerExitStatus.ABORTED != exitStatus) {
+            // shell script failed
+            // counts as completed
+            numCompletedContainers.incrementAndGet();
+            numFailedContainers.incrementAndGet();
+          } else {
+            // container was killed by framework, possibly preempted
+            // we should re-try as the container was lost for some reason
+            numAllocatedContainers.decrementAndGet();
+            numRequestedContainers.decrementAndGet();
+            // we do not need to release the container as it would be done
+            // by the RM
+          }
+        } else {
+          // nothing to do
+          // container completed successfully
+          numCompletedContainers.incrementAndGet();
+          LOG.info("Container completed successfully." + ", containerId="
+              + containerStatus.getContainerId());
+        }
+      }
+      
+      // ask for more containers if any failed
+      int askCount = numTotalContainers - numRequestedContainers.get();
+      numRequestedContainers.addAndGet(askCount);
+
+      if (askCount > 0) {
+        for (int i = 0; i < askCount; ++i) {
+          ContainerRequest containerAsk = setupContainerAskForRM();
+          amRMClient.addContainerRequest(containerAsk);
+        }
+      }
+      
+      if (numCompletedContainers.get() == numTotalContainers) {
+        done = true;
+      }
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> allocatedContainers) {
+      LOG.info("Got response from RM for container ask, allocatedCnt="
+          + allocatedContainers.size());
+      numAllocatedContainers.addAndGet(allocatedContainers.size());
+      for (Container allocatedContainer : allocatedContainers) {
+        LOG.info("Launching shell command on a new container."
+            + ", containerId=" + allocatedContainer.getId()
+            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+            + ":" + allocatedContainer.getNodeId().getPort()
+            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+            + ", containerResourceMemory"
+            + allocatedContainer.getResource().getMemory());
+        // + ", containerToken"
+        // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+        LaunchContainerRunnable runnableLaunchContainer =
+            new LaunchContainerRunnable(allocatedContainer, containerListener);
+        Thread launchThread = new Thread(runnableLaunchContainer);
+
+        // launch and start the container on a separate thread to keep
+        // the main thread unblocked
+        // as all containers may not be allocated at one go.
+        launchThreads.add(launchThread);
+        launchThread.start();
+      }
+    }
+
+    @Override
+    public void onShutdownRequest() {
+      done = true;
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+    @Override
+    public float getProgress() {
+      // set progress to deliver to RM on next heartbeat
+      float progress = (float) numCompletedContainers.get()
+          / numTotalContainers;
+      return progress;
+    }
+
+    @Override
+    public void onError(Throwable e) {
+      done = true;
+      amRMClient.stop();
+    }
+  }
+
+  @VisibleForTesting
+  static class NMCallbackHandler
+    implements NMClientAsync.CallbackHandler {
+
+    private ConcurrentMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<ContainerId, Container>();
+    private final ApplicationMaster applicationMaster;
+
+    public NMCallbackHandler(ApplicationMaster applicationMaster) {
+      this.applicationMaster = applicationMaster;
+    }
+
+    public void addContainer(ContainerId containerId, Container container) {
+      containers.putIfAbsent(containerId, container);
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to stop Container " + containerId);
+      }
+      containers.remove(containerId);
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Container Status: id=" + containerId + ", status=" +
+            containerStatus);
+      }
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to start Container " + containerId);
+      }
+      Container container = containers.get(containerId);
+      if (container != null) {
+        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+      }
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to start Container " + containerId);
+      containers.remove(containerId);
+      applicationMaster.numCompletedContainers.incrementAndGet();
+      applicationMaster.numFailedContainers.incrementAndGet();
+    }
+
+    @Override
+    public void onGetContainerStatusError(
+        ContainerId containerId, Throwable t) {
+      LOG.error("Failed to query the status of Container " + containerId);
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to stop Container " + containerId);
+      containers.remove(containerId);
+    }
+  }
+
+  /**
+   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+   * that will execute the shell command.
+   */
+  private class LaunchContainerRunnable implements Runnable {
+
+    // Allocated container
+    Container container;
+
+    NMCallbackHandler containerListener;
+
+    /**
+     * @param lcontainer Allocated container
+     * @param containerListener Callback handler of the container
+     */
+    public LaunchContainerRunnable(
+        Container lcontainer, NMCallbackHandler containerListener) {
+      this.container = lcontainer;
+      this.containerListener = containerListener;
+    }
+
+    @Override
+    /**
+     * Connects to CM, sets up container launch context 
+     * for shell command and eventually dispatches the container 
+     * start request to the CM. 
+     */
+    public void run() {
+      LOG.info("Setting up container launch container for containerid="
+          + container.getId());
+      ContainerLaunchContext ctx = Records
+          .newRecord(ContainerLaunchContext.class);
+
+      // Set the environment
+      ctx.setEnvironment(shellEnv);
+
+      // Set the local resources
+      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+      // The container for the eventual shell commands needs its own local
+      // resources too.
+      // In this scenario, if a shell script is specified, we need to have it
+      // copied and made available to the container.
+      if (!shellScriptPath.isEmpty()) {
+        LocalResource shellRsrc = Records.newRecord(LocalResource.class);
+        shellRsrc.setType(LocalResourceType.FILE);
+        shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+        try {
+          shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
+              shellScriptPath)));
+        } catch (URISyntaxException e) {
+          LOG.error("Error when trying to use shell script path specified"
+              + " in env, path=" + shellScriptPath);
+          e.printStackTrace();
+
+          // A failure scenario on bad input such as invalid shell script path
+          // We know we cannot continue launching the container
+          // so we should release it.
+          // TODO
+          numCompletedContainers.incrementAndGet();
+          numFailedContainers.incrementAndGet();
+          return;
+        }
+        shellRsrc.setTimestamp(shellScriptPathTimestamp);
+        shellRsrc.setSize(shellScriptPathLen);
+        localResources.put(ExecShellStringPath, shellRsrc);
+      }
+      ctx.setLocalResources(localResources);
+
+      // Set the necessary command to execute on the allocated container
+      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+      // Set executable command
+      vargs.add(shellCommand);
+      // Set shell script path
+      if (!shellScriptPath.isEmpty()) {
+        vargs.add(ExecShellStringPath);
+      }
+
+      // Set args for the shell command if any
+      vargs.add(shellArgs);
+      // Add log redirect params
+      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+      // Get final commmand
+      StringBuilder command = new StringBuilder();
+      for (CharSequence str : vargs) {
+        command.append(str).append(" ");
+      }
+
+      List<String> commands = new ArrayList<String>();
+      commands.add(command.toString());
+      ctx.setCommands(commands);
+
+      // Set up tokens for the container too. Today, for normal shell commands,
+      // the container in distribute-shell doesn't need any tokens. We are
+      // populating them mainly for NodeManagers to be able to download any
+      // files in the distributed file-system. The tokens are otherwise also
+      // useful in cases, for e.g., when one is running a "hadoop dfs" command
+      // inside the distributed shell.
+      ctx.setTokens(allTokens.duplicate());
+
+      containerListener.addContainer(container.getId(), container);
+      nmClientAsync.startContainerAsync(container, ctx);
+    }
+  }
+
+  /**
+   * Setup the request that will be sent to the RM for the container ask.
+   *
+   * @return the setup ResourceRequest to be sent to RM
+   */
+  private ContainerRequest setupContainerAskForRM() {
+    // setup requirements for hosts
+    // using * as any host will do for the distributed shell app
+    // set the priority for the request
+    Priority pri = Records.newRecord(Priority.class);
+    // TODO - what is the range for priority? how to decide?
+    pri.setPriority(requestPriority);
+
+    // Set up resource type requirements
+    // For now, only memory is supported so we set memory requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(containerMemory);
+
+    ContainerRequest request = new ContainerRequest(capability, null, null,
+        pri);
+    LOG.info("Requested container ask: " + request.toString());
+    return request;
+  }
+
+  public static ApplicationMaster getInstance() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
new file mode 100644
index 0000000..6671364
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
@@ -0,0 +1,19 @@
+package org.apache.helix.provisioning.yarn;
+
+public class ApplicationSpec {
+
+  int minContainers;
+
+  int maxContainers;
+
+  String serviceClass;
+    
+  String targetProvider;
+  
+  String stateModel;
+  
+  String taskClass;
+
+  int numTasks;
+  
+}