You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/01/07 23:30:54 UTC
[2/2] git commit: Adding support to use YARN for provisioning
Adding support to use YARN for provisioning
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/d23198c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/d23198c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/d23198c7
Branch: refs/heads/helix-provisioning
Commit: d23198c760e1d87f5d4a3c19d2ec57bdf4d3dfb0
Parents: 3153c5e
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Tue Jan 7 14:30:45 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Tue Jan 7 14:30:45 2014 -0800
----------------------------------------------------------------------
.../controller/provisioner/ContainerSpec.java | 2 +
helix-provisioning/.gitignore | 16 +
helix-provisioning/DISCLAIMER | 15 +
helix-provisioning/LICENSE | 273 ++++++
helix-provisioning/NOTICE | 30 +
helix-provisioning/pom.xml | 116 +++
helix-provisioning/src/assemble/assembly.xml | 60 ++
.../src/main/config/log4j.properties | 31 +
.../provisioning/yarn/ApplicationMaster.java | 889 +++++++++++++++++++
.../provisioning/yarn/ApplicationSpec.java | 19 +
.../apache/helix/provisioning/yarn/Client.java | 692 +++++++++++++++
.../provisioning/yarn/ContainerAskResponse.java | 17 +
.../yarn/ContainerLaunchResponse.java | 5 +
.../yarn/ContainerReleaseResponse.java | 5 +
.../yarn/ContainerStopResponse.java | 5 +
.../helix/provisioning/yarn/DSConstants.java | 47 +
.../yarn/GenericApplicationMaster.java | 306 +++++++
.../yarn/LaunchContainerRunnable.java | 79 ++
.../provisioning/yarn/NMCallbackHandler.java | 74 ++
.../provisioning/yarn/RMCallbackHandler.java | 109 +++
.../yarn/YarnApplicationMaster.java | 16 +
.../provisioning/yarn/YarnProvisioner.java | 120 +++
helix-provisioning/src/test/conf/testng.xml | 27 +
pom.xml | 1 +
24 files changed, 2954 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
index e814059..b393a64 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
@@ -24,6 +24,8 @@ public class ContainerSpec {
* Some unique id representing the container.
*/
ContainerId containerId;
+
+ String memory;
public ContainerSpec(ContainerId containerId) {
this.containerId = containerId;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/.gitignore
----------------------------------------------------------------------
diff --git a/helix-provisioning/.gitignore b/helix-provisioning/.gitignore
new file mode 100644
index 0000000..2411bd8
--- /dev/null
+++ b/helix-provisioning/.gitignore
@@ -0,0 +1,16 @@
+/target
+/.project
+/.classpath
+/.settings
+/zkdata
+/test-output
+/src/main/scripts/integration-test/var
+#/src/test/java/com/linkedin/dds/
+#/src/main/scripts/integration-test/config
+/src/main/scripts/target/
+/src/main/scripts/integration-test/script/.metadata_infra
+#/src/main/scripts/integration-test/script/dds_driver.py
+#/src/main/scripts/integration-test/script/pexpect.py
+#/src/main/scripts/integration-test/script/utility.py
+*.pyc
+/bin
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/DISCLAIMER
----------------------------------------------------------------------
diff --git a/helix-provisioning/DISCLAIMER b/helix-provisioning/DISCLAIMER
new file mode 100644
index 0000000..2001d31
--- /dev/null
+++ b/helix-provisioning/DISCLAIMER
@@ -0,0 +1,15 @@
+Apache Helix is an effort undergoing incubation at the Apache Software
+Foundation (ASF), sponsored by the Apache Incubator PMC.
+
+Incubation is required of all newly accepted projects until a further review
+indicates that the infrastructure, communications, and decision making process
+have stabilized in a manner consistent with other successful ASF projects.
+
+While incubation status is not necessarily a reflection of the completeness
+or stability of the code, it does indicate that the project has yet to be
+fully endorsed by the ASF.
+
+For more information about the incubation status of the Apache Helix project you
+can go to the following page:
+
+http://incubator.apache.org/projects/helix.html
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/LICENSE
----------------------------------------------------------------------
diff --git a/helix-provisioning/LICENSE b/helix-provisioning/LICENSE
new file mode 100644
index 0000000..413913f
--- /dev/null
+++ b/helix-provisioning/LICENSE
@@ -0,0 +1,273 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+
+For xstream:
+
+Copyright (c) 2003-2006, Joe Walnes
+Copyright (c) 2006-2009, 2011 XStream Committers
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or other materials provided
+with the distribution.
+
+3. Neither the name of XStream nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+for jline:
+
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/NOTICE
----------------------------------------------------------------------
diff --git a/helix-provisioning/NOTICE b/helix-provisioning/NOTICE
new file mode 100644
index 0000000..e070e15
--- /dev/null
+++ b/helix-provisioning/NOTICE
@@ -0,0 +1,30 @@
+Apache Helix
+Copyright 2012 The Apache Software Foundation
+
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+Codehaus (http://www.codehaus.org/)
+Licensed under the BSD License.
+
+This product includes software developed at
+jline (http://jline.sourceforge.net/ )
+Licensed under the BSD License.
+
+This product includes software developed at
+josql (http://sourceforge.net/projects/josql/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+restlet (http://www.restlet.org/about/legal).
+Licensed under the Apache License 2.0.
+
+
+II. License Summary
+- Apache License 2.0
+- BSD License
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml
new file mode 100644
index 0000000..1f5a585
--- /dev/null
+++ b/helix-provisioning/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix</artifactId>
+ <version>0.7.1-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>helix-provisioning</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Helix :: HelixAgent</name>
+
+ <properties>
+ <hadoop.version>2.2.0</hadoop.version>
+ <osgi.import>
+ org.apache.helix*,
+ org.apache.commons.cli;version="[1.2,2)",
+ org.apache.log4j,
+ *
+ </osgi.import>
+ <osgi.export>org.apache.helix.provisioning.yarn*;version="${project.version};-noimport:=true</osgi.export>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>org.apache.helix.provisioning.HelixAgentMain</mainClass>
+ <name>start-helix-provisioning</name>
+ </program>
+ </programs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/assemble/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/assemble/assembly.xml b/helix-provisioning/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/helix-provisioning/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+ <id>pkg</id>
+ <formats>
+ <format>tar</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+ <outputDirectory>repo</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ <excludes>
+ <exclude>**/*.xml</exclude>
+ </excludes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+ <outputDirectory>conf</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>LICENSE</include>
+ <include>NOTICE</include>
+ <include>DISCLAIMER</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/config/log4j.properties b/helix-provisioning/src/main/config/log4j.properties
new file mode 100644
index 0000000..91fac03
--- /dev/null
+++ b/helix-provisioning/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
new file mode 100644
index 0000000..d63b300
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
@@ -0,0 +1,889 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.provisioning.yarn;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * An ApplicationMaster for executing shell commands on a set of launched
+ * containers using the YARN framework.
+ *
+ * <p>
+ * This class is meant to act as an example on how to write yarn-based
+ * application masters.
+ * </p>
+ *
+ * <p>
+ * The ApplicationMaster is started on a container by the
+ * <code>ResourceManager</code>'s launcher. The first thing that the
+ * <code>ApplicationMaster</code> needs to do is to connect and register itself
+ * with the <code>ResourceManager</code>. The registration sets up information
+ * within the <code>ResourceManager</code> regarding what host:port the
+ * ApplicationMaster is listening on to provide any form of functionality to a
+ * client as well as a tracking url that a client can use to keep track of
+ * status/job history if needed. However, in the distributedshell, trackingurl
+ * and appMasterHost:appMasterRpcPort are not supported.
+ * </p>
+ *
+ * <p>
+ * The <code>ApplicationMaster</code> needs to send a heartbeat to the
+ * <code>ResourceManager</code> at regular intervals to inform the
+ * <code>ResourceManager</code> that it is up and alive. The
+ * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
+ * <code>ApplicationMaster</code> acts as a heartbeat.
+ *
+ * <p>
+ * For the actual handling of the job, the <code>ApplicationMaster</code> has to
+ * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
+ * required no. of containers using {@link ResourceRequest} with the necessary
+ * resource specifications such as node location, computational
+ * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
+ * responds with an {@link AllocateResponse} that informs the
+ * <code>ApplicationMaster</code> of the set of newly allocated containers,
+ * completed containers as well as current state of available resources.
+ * </p>
+ *
+ * <p>
+ * For each allocated container, the <code>ApplicationMaster</code> can then set
+ * up the necessary launch context via {@link ContainerLaunchContext} to specify
+ * the allocated container id, local resources required by the executable, the
+ * environment to be setup for the executable, commands to execute, etc. and
+ * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
+ * launch and execute the defined commands on the given allocated container.
+ * </p>
+ *
+ * <p>
+ * The <code>ApplicationMaster</code> can monitor the launched container by
+ * either querying the <code>ResourceManager</code> using
+ * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
+ * the {@link ContainerManagementProtocol} by querying for the status of the allocated
+ * container's {@link ContainerId}.
+ *
+ * <p>
+ * After the job has been completed, the <code>ApplicationMaster</code> has to
+ * send a {@link FinishApplicationMasterRequest} to the
+ * <code>ResourceManager</code> to inform it that the
+ * <code>ApplicationMaster</code> has been completed.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ApplicationMaster {
+
+ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+ // Configuration
+ private Configuration conf;
+
+ // Handle to communicate with the Resource Manager
+ @SuppressWarnings("rawtypes")
+ private AMRMClientAsync amRMClient;
+
+ // Handle to communicate with the Node Manager
+ private NMClientAsync nmClientAsync;
+ // Listen to process the response from the Node Manager
+ private NMCallbackHandler containerListener;
+
+ // Application Attempt Id ( combination of attemptId and fail count )
+ private ApplicationAttemptId appAttemptID;
+
+ // TODO
+ // For status update for clients - yet to be implemented
+ // Hostname of the container
+ private String appMasterHostname = "";
+ // Port on which the app master listens for status updates from clients
+ private int appMasterRpcPort = -1;
+ // Tracking url to which app master publishes info for clients to monitor
+ private String appMasterTrackingUrl = "";
+
+ // App Master configuration
+ // No. of containers to run shell command on
+ private int numTotalContainers = 1;
+ // Memory to request for the container on which the shell command will run
+ private int containerMemory = 10;
+ // Priority of the request
+ private int requestPriority;
+
+ // Counter for completed containers ( complete denotes successful or failed )
+ private AtomicInteger numCompletedContainers = new AtomicInteger();
+ // Allocated container count so that we know how many containers has the RM
+ // allocated to us
+ private AtomicInteger numAllocatedContainers = new AtomicInteger();
+ // Count of failed containers
+ private AtomicInteger numFailedContainers = new AtomicInteger();
+ // Count of containers already requested from the RM
+ // Needed as once requested, we should not request for containers again.
+ // Only request for more if the original requirement changes.
+ private AtomicInteger numRequestedContainers = new AtomicInteger();
+
+ // Shell command to be executed
+ private String shellCommand = "";
+ // Args to be passed to the shell command
+ private String shellArgs = "";
+ // Env variables to be setup for the shell command
+ private Map<String, String> shellEnv = new HashMap<String, String>();
+
+ // Location of shell script ( obtained from info set in env )
+ // Shell script path in fs
+ private String shellScriptPath = "";
+ // Timestamp needed for creating a local resource
+ private long shellScriptPathTimestamp = 0;
+ // File length needed for local resource
+ private long shellScriptPathLen = 0;
+
+ // Hardcoded path to shell script in launch container's local env
+ private final String ExecShellStringPath = "ExecShellScript.sh";
+
+ private volatile boolean done;
+ private volatile boolean success;
+
+ private ByteBuffer allTokens;
+
+ // Launch threads
+ private List<Thread> launchThreads = new ArrayList<Thread>();
+
+ /**
+ * @param args Command line args
+ */
+ public static void main(String[] args) {
+ boolean result = false;
+ try {
+ ApplicationMaster appMaster = new ApplicationMaster();
+ LOG.info("Initializing ApplicationMaster");
+ LOG.info("classpath:" + System.getProperty("java.class.path"));
+ boolean doRun = appMaster.init(args);
+ if (!doRun) {
+ System.exit(0);
+ }
+ result = appMaster.run();
+ } catch (Throwable t) {
+ LOG.fatal("Error running ApplicationMaster", t);
+ System.exit(1);
+ }
+ if (result) {
+ LOG.info("Application Master completed successfully. exiting");
+ System.exit(0);
+ } else {
+ LOG.info("Application Master failed. exiting");
+ System.exit(2);
+ }
+ }
+
+ /**
+ * Dump out contents of $CWD and the environment to stdout for debugging
+ */
+ private void dumpOutDebugInfo() {
+
+ LOG.info("Dump debug output");
+ Map<String, String> envs = System.getenv();
+ for (Map.Entry<String, String> env : envs.entrySet()) {
+ LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
+ System.out.println("System env: key=" + env.getKey() + ", val="
+ + env.getValue());
+ }
+
+ String cmd = "ls -al";
+ Runtime run = Runtime.getRuntime();
+ Process pr = null;
+ try {
+ pr = run.exec(cmd);
+ pr.waitFor();
+
+ BufferedReader buf = new BufferedReader(new InputStreamReader(
+ pr.getInputStream()));
+ String line = "";
+ while ((line = buf.readLine()) != null) {
+ LOG.info("System CWD content: " + line);
+ System.out.println("System CWD content: " + line);
+ }
+ buf.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public ApplicationMaster() {
+ // Set up the configuration
+ conf = new YarnConfiguration();
+ }
+
+ /**
+ * Parse command line options
+ *
+ * @param args Command line args
+ * @return Whether init successful and run should be invoked
+ * @throws ParseException
+ * @throws IOException
+ */
+ public boolean init(String[] args) throws ParseException, IOException {
+
+ Options opts = new Options();
+ opts.addOption("app_attempt_id", true,
+ "App Attempt ID. Not to be used unless for testing purposes");
+ opts.addOption("shell_command", true,
+ "Shell command to be executed by the Application Master");
+ opts.addOption("shell_script", true,
+ "Location of the shell script to be executed");
+ opts.addOption("shell_args", true, "Command line args for the shell script");
+ opts.addOption("shell_env", true,
+ "Environment for shell script. Specified as env_key=env_val pairs");
+ opts.addOption("container_memory", true,
+ "Amount of memory in MB to be requested to run the shell command");
+ opts.addOption("num_containers", true,
+ "No. of containers on which the shell command needs to be executed");
+ opts.addOption("priority", true, "Application Priority. Default 0");
+ opts.addOption("debug", false, "Dump out debug information");
+
+ opts.addOption("help", false, "Print usage");
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (args.length == 0) {
+ printUsage(opts);
+ throw new IllegalArgumentException(
+ "No args specified for application master to initialize");
+ }
+
+ if (cliParser.hasOption("help")) {
+ printUsage(opts);
+ return false;
+ }
+
+ if (cliParser.hasOption("debug")) {
+ dumpOutDebugInfo();
+ }
+
+ Map<String, String> envs = System.getenv();
+
+ if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
+ if (cliParser.hasOption("app_attempt_id")) {
+ String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
+ appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+ } else {
+ throw new IllegalArgumentException(
+ "Application Attempt Id not set in the environment");
+ }
+ } else {
+ ContainerId containerId = ConverterUtils.toContainerId(envs
+ .get(Environment.CONTAINER_ID.name()));
+ appAttemptID = containerId.getApplicationAttemptId();
+ }
+
+ if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
+ throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+ + " not set in the environment");
+ }
+ if (!envs.containsKey(Environment.NM_HOST.name())) {
+ throw new RuntimeException(Environment.NM_HOST.name()
+ + " not set in the environment");
+ }
+ if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
+ throw new RuntimeException(Environment.NM_HTTP_PORT
+ + " not set in the environment");
+ }
+ if (!envs.containsKey(Environment.NM_PORT.name())) {
+ throw new RuntimeException(Environment.NM_PORT.name()
+ + " not set in the environment");
+ }
+
+ LOG.info("Application master for app" + ", appId="
+ + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
+ + appAttemptID.getApplicationId().getClusterTimestamp()
+ + ", attemptId=" + appAttemptID.getAttemptId());
+
+ if (!cliParser.hasOption("shell_command")) {
+ throw new IllegalArgumentException(
+ "No shell command specified to be executed by application master");
+ }
+ shellCommand = cliParser.getOptionValue("shell_command");
+
+ if (cliParser.hasOption("shell_args")) {
+ shellArgs = cliParser.getOptionValue("shell_args");
+ }
+ if (cliParser.hasOption("shell_env")) {
+ String shellEnvs[] = cliParser.getOptionValues("shell_env");
+ for (String env : shellEnvs) {
+ env = env.trim();
+ int index = env.indexOf('=');
+ if (index == -1) {
+ shellEnv.put(env, "");
+ continue;
+ }
+ String key = env.substring(0, index);
+ String val = "";
+ if (index < (env.length() - 1)) {
+ val = env.substring(index + 1);
+ }
+ shellEnv.put(key, val);
+ }
+ }
+
+ if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
+ shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
+
+ if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
+ shellScriptPathTimestamp = Long.valueOf(envs
+ .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
+ }
+ if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
+ shellScriptPathLen = Long.valueOf(envs
+ .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
+ }
+
+ if (!shellScriptPath.isEmpty()
+ && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
+ LOG.error("Illegal values in env for shell script path" + ", path="
+ + shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
+ + shellScriptPathTimestamp);
+ throw new IllegalArgumentException(
+ "Illegal values in env for shell script path");
+ }
+ }
+
+ containerMemory = Integer.parseInt(cliParser.getOptionValue(
+ "container_memory", "10"));
+ numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
+ "num_containers", "1"));
+ if (numTotalContainers == 0) {
+ throw new IllegalArgumentException(
+ "Cannot run distributed shell with no containers");
+ }
+ requestPriority = Integer.parseInt(cliParser
+ .getOptionValue("priority", "0"));
+
+ return true;
+ }
+
+ /**
+ * Helper function to print usage
+ *
+ * @param opts Parsed command line options
+ */
+ private void printUsage(Options opts) {
+ new HelpFormatter().printHelp("ApplicationMaster", opts);
+ }
+
+ /**
+ * Main run function for the application master
+ *
+ * @throws YarnException
+ * @throws IOException
+ */
+ @SuppressWarnings({ "unchecked" })
+ public boolean run() throws YarnException, IOException {
+ LOG.info("Starting ApplicationMaster");
+
+ Credentials credentials =
+ UserGroupInformation.getCurrentUser().getCredentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ // Now remove the AM->RM token so that containers cannot access it.
+ Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ Token<?> token = iter.next();
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ iter.remove();
+ }
+ }
+ allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+ AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+ amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+ amRMClient.init(conf);
+ amRMClient.start();
+
+ containerListener = createNMCallbackHandler();
+ nmClientAsync = new NMClientAsyncImpl(containerListener);
+ nmClientAsync.init(conf);
+ nmClientAsync.start();
+
+ // Setup local RPC Server to accept status requests directly from clients
+ // TODO need to setup a protocol for client to be able to communicate to
+ // the RPC server
+ // TODO use the rpc port info to register with the RM for the client to
+ // send requests to this app master
+
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ appMasterHostname = NetUtils.getHostname();
+ RegisterApplicationMasterResponse response = amRMClient
+ .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ appMasterTrackingUrl);
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ // A resource ask cannot exceed the max.
+ if (containerMemory > maxMem) {
+ LOG.info("Container memory specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerMemory + ", max="
+ + maxMem);
+ containerMemory = maxMem;
+ }
+
+ // Setup ask for containers from RM
+ // Send request for containers to RM
+ // Until we get our fully allocated quota, we keep on polling RM for
+ // containers
+ // Keep looping until all the containers are launched and shell script
+ // executed on them ( regardless of success/failure).
+ for (int i = 0; i < numTotalContainers; ++i) {
+ ContainerRequest containerAsk = setupContainerAskForRM();
+ amRMClient.addContainerRequest(containerAsk);
+ }
+ numRequestedContainers.set(numTotalContainers);
+
+ while (!done
+ && (numCompletedContainers.get() != numTotalContainers)) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {}
+ }
+ finish();
+
+ return success;
+ }
+
+ @VisibleForTesting
+ NMCallbackHandler createNMCallbackHandler() {
+ return new NMCallbackHandler(this);
+ }
+
+ private void finish() {
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ // When the application completes, it should stop all running containers
+ LOG.info("Application completed. Stopping running containers");
+ nmClientAsync.stop();
+
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ LOG.info("Application completed. Signalling finish to RM");
+
+ FinalApplicationStatus appStatus;
+ String appMessage = null;
+ success = true;
+ if (numFailedContainers.get() == 0 &&
+ numCompletedContainers.get() == numTotalContainers) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ + ", completed=" + numCompletedContainers.get() + ", allocated="
+ + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ success = false;
+ }
+ try {
+ amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (YarnException ex) {
+ LOG.error("Failed to unregister application", ex);
+ } catch (IOException e) {
+ LOG.error("Failed to unregister application", e);
+ }
+
+ amRMClient.stop();
+ }
+
+ private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+ LOG.info("Got response from RM for container ask, completedCnt="
+ + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Got container status for containerID="
+ + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus="
+ + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (ContainerExitStatus.ABORTED != exitStatus) {
+ // shell script failed
+ // counts as completed
+ numCompletedContainers.incrementAndGet();
+ numFailedContainers.incrementAndGet();
+ } else {
+ // container was killed by framework, possibly preempted
+ // we should re-try as the container was lost for some reason
+ numAllocatedContainers.decrementAndGet();
+ numRequestedContainers.decrementAndGet();
+ // we do not need to release the container as it would be done
+ // by the RM
+ }
+ } else {
+ // nothing to do
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ LOG.info("Container completed successfully." + ", containerId="
+ + containerStatus.getContainerId());
+ }
+ }
+
+ // ask for more containers if any failed
+ int askCount = numTotalContainers - numRequestedContainers.get();
+ numRequestedContainers.addAndGet(askCount);
+
+ if (askCount > 0) {
+ for (int i = 0; i < askCount; ++i) {
+ ContainerRequest containerAsk = setupContainerAskForRM();
+ amRMClient.addContainerRequest(containerAsk);
+ }
+ }
+
+ if (numCompletedContainers.get() == numTotalContainers) {
+ done = true;
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ LOG.info("Launching shell command on a new container."
+ + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ + ":" + allocatedContainer.getNodeId().getPort()
+ + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+ // + ", containerToken"
+ // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+ LaunchContainerRunnable runnableLaunchContainer =
+ new LaunchContainerRunnable(allocatedContainer, containerListener);
+ Thread launchThread = new Thread(runnableLaunchContainer);
+
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchThread.start();
+ }
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ done = true;
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+ @Override
+ public float getProgress() {
+ // set progress to deliver to RM on next heartbeat
+ float progress = (float) numCompletedContainers.get()
+ / numTotalContainers;
+ return progress;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ done = true;
+ amRMClient.stop();
+ }
+ }
+
+ @VisibleForTesting
+ static class NMCallbackHandler
+ implements NMClientAsync.CallbackHandler {
+
+ private ConcurrentMap<ContainerId, Container> containers =
+ new ConcurrentHashMap<ContainerId, Container>();
+ private final ApplicationMaster applicationMaster;
+
+ public NMCallbackHandler(ApplicationMaster applicationMaster) {
+ this.applicationMaster = applicationMaster;
+ }
+
+ public void addContainer(ContainerId containerId, Container container) {
+ containers.putIfAbsent(containerId, container);
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to stop Container " + containerId);
+ }
+ containers.remove(containerId);
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container Status: id=" + containerId + ", status=" +
+ containerStatus);
+ }
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId,
+ Map<String, ByteBuffer> allServiceResponse) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to start Container " + containerId);
+ }
+ Container container = containers.get(containerId);
+ if (container != null) {
+ applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+ }
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to start Container " + containerId);
+ containers.remove(containerId);
+ applicationMaster.numCompletedContainers.incrementAndGet();
+ applicationMaster.numFailedContainers.incrementAndGet();
+ }
+
+ @Override
+ public void onGetContainerStatusError(
+ ContainerId containerId, Throwable t) {
+ LOG.error("Failed to query the status of Container " + containerId);
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to stop Container " + containerId);
+ containers.remove(containerId);
+ }
+ }
+
+ /**
+ * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+ * that will execute the shell command.
+ */
+ private class LaunchContainerRunnable implements Runnable {
+
+ // Allocated container
+ Container container;
+
+ NMCallbackHandler containerListener;
+
+ /**
+ * @param lcontainer Allocated container
+ * @param containerListener Callback handler of the container
+ */
+ public LaunchContainerRunnable(
+ Container lcontainer, NMCallbackHandler containerListener) {
+ this.container = lcontainer;
+ this.containerListener = containerListener;
+ }
+
+ @Override
+ /**
+ * Connects to CM, sets up container launch context
+ * for shell command and eventually dispatches the container
+ * start request to the CM.
+ */
+ public void run() {
+ LOG.info("Setting up container launch container for containerid="
+ + container.getId());
+ ContainerLaunchContext ctx = Records
+ .newRecord(ContainerLaunchContext.class);
+
+ // Set the environment
+ ctx.setEnvironment(shellEnv);
+
+ // Set the local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ // The container for the eventual shell commands needs its own local
+ // resources too.
+ // In this scenario, if a shell script is specified, we need to have it
+ // copied and made available to the container.
+ if (!shellScriptPath.isEmpty()) {
+ LocalResource shellRsrc = Records.newRecord(LocalResource.class);
+ shellRsrc.setType(LocalResourceType.FILE);
+ shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ try {
+ shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
+ shellScriptPath)));
+ } catch (URISyntaxException e) {
+ LOG.error("Error when trying to use shell script path specified"
+ + " in env, path=" + shellScriptPath);
+ e.printStackTrace();
+
+ // A failure scenario on bad input such as invalid shell script path
+ // We know we cannot continue launching the container
+ // so we should release it.
+ // TODO
+ numCompletedContainers.incrementAndGet();
+ numFailedContainers.incrementAndGet();
+ return;
+ }
+ shellRsrc.setTimestamp(shellScriptPathTimestamp);
+ shellRsrc.setSize(shellScriptPathLen);
+ localResources.put(ExecShellStringPath, shellRsrc);
+ }
+ ctx.setLocalResources(localResources);
+
+ // Set the necessary command to execute on the allocated container
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+ // Set executable command
+ vargs.add(shellCommand);
+ // Set shell script path
+ if (!shellScriptPath.isEmpty()) {
+ vargs.add(ExecShellStringPath);
+ }
+
+ // Set args for the shell command if any
+ vargs.add(shellArgs);
+ // Add log redirect params
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+ ctx.setCommands(commands);
+
+ // Set up tokens for the container too. Today, for normal shell commands,
+ // the container in distribute-shell doesn't need any tokens. We are
+ // populating them mainly for NodeManagers to be able to download any
+ // files in the distributed file-system. The tokens are otherwise also
+ // useful in cases, for e.g., when one is running a "hadoop dfs" command
+ // inside the distributed shell.
+ ctx.setTokens(allTokens.duplicate());
+
+ containerListener.addContainer(container.getId(), container);
+ nmClientAsync.startContainerAsync(container, ctx);
+ }
+ }
+
+ /**
+ * Setup the request that will be sent to the RM for the container ask.
+ *
+ * @return the setup ResourceRequest to be sent to RM
+ */
+ private ContainerRequest setupContainerAskForRM() {
+ // setup requirements for hosts
+ // using * as any host will do for the distributed shell app
+ // set the priority for the request
+ Priority pri = Records.newRecord(Priority.class);
+ // TODO - what is the range for priority? how to decide?
+ pri.setPriority(requestPriority);
+
+ // Set up resource type requirements
+ // For now, only memory is supported so we set memory requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(containerMemory);
+
+ ContainerRequest request = new ContainerRequest(capability, null, null,
+ pri);
+ LOG.info("Requested container ask: " + request.toString());
+ return request;
+ }
+
+ public static ApplicationMaster getInstance() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d23198c7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
new file mode 100644
index 0000000..6671364
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
@@ -0,0 +1,19 @@
+package org.apache.helix.provisioning.yarn;
+
+public class ApplicationSpec {
+
+ int minContainers;
+
+ int maxContainers;
+
+ String serviceClass;
+
+ String targetProvider;
+
+ String stateModel;
+
+ String taskClass;
+
+ int numTasks;
+
+}