You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/11/21 22:54:31 UTC

[08/15] Initial import commit.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..e6f0522
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,556 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.twill</groupId>
+    <artifactId>twill-parent</artifactId>
+    <version>1.3.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+    <name>Twill library parent project</name>
+
+    <modules>
+        <module>common</module>
+        <module>discovery-api</module>
+        <module>api</module>
+        <module>zookeeper</module>
+        <module>discovery-core</module>
+        <module>core</module>
+        <module>yarn</module>
+    </modules>
+
+    <repositories>
+        <repository>
+            <id>cloudera-releases</id>
+            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
+        <hadoop.version>[2.0.2-alpha,2.2.0]</hadoop.version>
+        <hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
+    </properties>
+
+    <scm>
+        <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-twill.git</connection>
+        <url>https://git-wip-us.apache.org/repos/asf?p=incubator-twill.git;a=summary</url>
+        <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-twill.git</developerConnection>
+    </scm>
+
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-source-plugin</artifactId>
+                    <version>2.2.1</version>
+                    <configuration>
+                        <excludeResources>true</excludeResources>
+                    </configuration>
+                    <executions>
+                        <execution>
+                            <id>attach-sources</id>
+                            <phase>package</phase>
+                            <goals>
+                                <goal>jar-no-fork</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-javadoc-plugin</artifactId>
+                    <version>2.9</version>
+                    <configuration>
+                        <excludePackageNames>*.internal.*</excludePackageNames>
+                        <links>
+                            <link>http://download.oracle.com/javase/6/docs/api/</link>
+                        </links>
+                        <bottom>
+                            <![CDATA[Copyright &#169; 2013 <a href="http://www.apache.org">The Apache Software Foundation</a>. All rights reserved.]]>
+                        </bottom>
+                    </configuration>
+                    <executions>
+                        <execution>
+                            <id>attach-javadoc</id>
+                            <phase>package</phase>
+                            <goals>
+                                <goal>jar</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.6</source>
+                    <target>1.6</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-release-plugin</artifactId>
+                <version>2.4.1</version>
+                <configuration>
+                    <autoVersionSubmodules>true</autoVersionSubmodules>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.2</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.14.1</version>
+                <configuration>
+                    <argLine>-Xmx512m</argLine>
+                    <redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
+                    <systemPropertyVariables>
+                        <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+                    </systemPropertyVariables>
+                    <includes>
+                        <include>**/*TestSuite.java</include>
+                        <include>**/Test*.java</include>
+                        <include>**/*Test.java</include>
+                        <include>**/*TestCase.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>2.8</version>
+                <configuration>
+                    <deployAtEnd>true</deployAtEnd>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2.0</id>
+            <properties>
+                <hadoop.version>2.0.2-alpha</hadoop.version>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>build-helper-maven-plugin</artifactId>
+                        <version>1.8</version>
+                        <executions>
+                            <execution>
+                                <id>add-source</id>
+                                <phase>generate-sources</phase>
+                                <goals>
+                                    <goal>add-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>src/main/hadoop20</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>cdh-4.4.0</id>
+            <properties>
+                <hadoop.version>2.0.0-cdh4.4.0</hadoop.version>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>build-helper-maven-plugin</artifactId>
+                        <version>1.8</version>
+                        <executions>
+                            <execution>
+                                <id>add-source</id>
+                                <phase>generate-sources</phase>
+                                <goals>
+                                    <goal>add-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>src/main/hadoop20</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>hadoop-2.1</id>
+            <properties>
+                <hadoop.version>2.1.0-beta</hadoop.version>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>build-helper-maven-plugin</artifactId>
+                        <version>1.8</version>
+                        <executions>
+                            <execution>
+                                <id>add-source</id>
+                                <phase>generate-sources</phase>
+                                <goals>
+                                    <goal>add-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>src/main/hadoop21</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>hadoop-2.2</id>
+            <properties>
+                <hadoop.version>2.2.0</hadoop.version>
+            </properties>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>build-helper-maven-plugin</artifactId>
+                        <version>1.8</version>
+                        <executions>
+                            <execution>
+                                <id>add-source</id>
+                                <phase>generate-sources</phase>
+                                <goals>
+                                    <goal>add-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>src/main/hadoop21</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
+    </profiles>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>13.0.1</version>
+            </dependency>
+            <dependency>
+                <groupId>com.google.code.findbugs</groupId>
+                <artifactId>jsr305</artifactId>
+                <version>2.0.1</version>
+            </dependency>
+            <dependency>
+                <groupId>com.google.code.gson</groupId>
+                <artifactId>gson</artifactId>
+                <version>2.2.4</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.zookeeper</groupId>
+                <artifactId>zookeeper</artifactId>
+                <version>3.4.5</version>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>slf4j-api</artifactId>
+                        <groupId>org.slf4j</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>slf4j-log4j12</artifactId>
+                        <groupId>org.slf4j</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>log4j</artifactId>
+                        <groupId>log4j</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>junit</artifactId>
+                        <groupId>junit</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.jboss.netty</groupId>
+                        <artifactId>netty</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty</artifactId>
+                <version>3.6.6.Final</version>
+            </dependency>
+            <dependency>
+                <groupId>org.xerial.snappy</groupId>
+                <artifactId>snappy-java</artifactId>
+                <version>1.0.4.1</version>
+            </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+                <version>1.7.5</version>
+            </dependency>
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-core</artifactId>
+                <version>1.0.9</version>
+            </dependency>
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-classic</artifactId>
+                <version>1.0.9</version>
+            </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>jcl-over-slf4j</artifactId>
+                <version>1.7.2</version>
+            </dependency>
+            <dependency>
+                <groupId>org.ow2.asm</groupId>
+                <artifactId>asm-all</artifactId>
+                <version>4.0</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-api</artifactId>
+                <version>${hadoop.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.jboss.netty</groupId>
+                        <artifactId>netty</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.netty</groupId>
+                        <artifactId>netty</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-common</artifactId>
+                <version>${hadoop.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.netty</groupId>
+                        <artifactId>netty</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-yarn-client</artifactId>
+                <version>${hadoop.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.netty</groupId>
+                        <artifactId>netty</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-common</artifactId>
+                <version>${hadoop.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>commons-logging</groupId>
+                        <artifactId>commons-logging</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.avro</groupId>
+                        <artifactId>avro</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.zookeeper</groupId>
+                        <artifactId>zookeeper</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>guava</artifactId>
+                        <groupId>com.google.guava</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>jersey-core</artifactId>
+                        <groupId>com.sun.jersey</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>jersey-json</artifactId>
+                        <groupId>com.sun.jersey</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>jersey-server</artifactId>
+                        <groupId>com.sun.jersey</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>jasper-compiler</artifactId>
+                        <groupId>tomcat</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>jasper-runtime</artifactId>
+                        <groupId>tomcat</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>jsp-api</artifactId>
+                        <groupId>javax.servlet.jsp</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>slf4j-api</artifactId>
+                        <groupId>org.slf4j</groupId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-hdfs</artifactId>
+                <version>${hadoop.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>commons-logging</groupId>
+                        <artifactId>commons-logging</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>tomcat</groupId>
+                        <artifactId>jasper-runtime</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>javax.servlet.jsp</groupId>
+                        <artifactId>jsp-api</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-daemon</groupId>
+                        <artifactId>commons-daemon</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.sun.jersey</groupId>
+                        <artifactId>jersey-core</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-minicluster</artifactId>
+                <version>${hadoop.version}</version>
+                <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>io.netty</groupId>
+                        <artifactId>netty</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>junit</groupId>
+                <artifactId>junit</artifactId>
+                <version>4.11</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-compress</artifactId>
+                <version>1.5</version>
+                <scope>test</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-report-plugin</artifactId>
+                <version>2.14.1</version>
+            </plugin>
+        </plugins>
+    </reporting>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
new file mode 100644
index 0000000..382d104
--- /dev/null
+++ b/yarn/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-yarn</artifactId>
+    <name>Twill Apache Hadoop YARN library</name>
+
+    <properties>
+        <output.dir>target/classes</output.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-discovery-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <outputDirectory>${output.dir}</outputDirectory>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2.0</id>
+            <properties>
+                <output.dir>${hadoop20.output.dir}</output.dir>
+            </properties>
+        </profile>
+        <profile>
+            <id>hadoop-2.1</id>
+            <build>
+                <resources>
+                    <resource>
+                        <directory>${hadoop20.output.dir}</directory>
+                    </resource>
+                    <resource>
+                        <directory>src/main/resources</directory>
+                    </resource>
+                </resources>
+            </build>
+        </profile>
+        <profile>
+            <id>hadoop-2.2</id>
+            <build>
+                <resources>
+                    <resource>
+                        <directory>${hadoop20.output.dir}</directory>
+                    </resource>
+                    <resource>
+                        <directory>src/main/resources</directory>
+                    </resource>
+                </resources>
+            </build>
+        </profile>
+    </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
new file mode 100644
index 0000000..d98dee1
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import org.apache.twill.internal.yarn.ports.AMRMClient;
+import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
+import org.apache.twill.internal.yarn.ports.AllocationResponse;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
+  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+  static {
+    STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+      @Override
+      public YarnContainerStatus apply(ContainerStatus status) {
+        return new Hadoop20YarnContainerStatus(status);
+      }
+    };
+  }
+
+  private final ContainerId containerId;
+  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+  private final AMRMClient amrmClient;
+  private final YarnNMClient nmClient;
+  private InetSocketAddress trackerAddr;
+  private URL trackerUrl;
+  private Resource maxCapability;
+  private Resource minCapability;
+
+  public Hadoop20YarnAMClient(Configuration conf) {
+    String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+    Preconditions.checkArgument(masterContainerId != null,
+                                "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
+    this.containerId = ConverterUtils.toContainerId(masterContainerId);
+    this.containerRequests = ArrayListMultimap.create();
+
+    this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
+    this.amrmClient.init(conf);
+    this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+    Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+    amrmClient.start();
+
+    RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+                                                                                      trackerAddr.getPort(),
+                                                                                      trackerUrl.toString());
+    maxCapability = response.getMaximumResourceCapability();
+    minCapability = response.getMinimumResourceCapability();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+    amrmClient.stop();
+  }
+
+  @Override
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  @Override
+  public String getHost() {
+    return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
+  }
+
+  @Override
+  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+    this.trackerAddr = trackerAddr;
+    this.trackerUrl = trackerUrl;
+  }
+
+  @Override
+  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+    AllocationResponse response = amrmClient.allocate(progress);
+    List<ProcessLauncher<YarnContainerInfo>> launchers
+      = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
+
+    for (Container container : response.getAllocatedContainers()) {
+      launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
+    }
+
+    if (!launchers.isEmpty()) {
+      handler.acquired(launchers);
+
+      // If no process has been launched through the given launcher, return the container.
+      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+        // This cast always works.
+        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+        if (!launcher.isLaunched()) {
+          Container container = launcher.getContainerInfo().getContainer();
+          LOG.info("Nothing to run in container, releasing it: {}", container);
+          amrmClient.releaseAssignedContainer(container.getId());
+        }
+      }
+    }
+
+    List<YarnContainerStatus> completed = ImmutableList.copyOf(
+      Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+    if (!completed.isEmpty()) {
+      handler.completed(completed);
+    }
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability) {
+    return addContainerRequest(capability, 1);
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+    return new ContainerRequestBuilder(adjustCapability(capability), count) {
+      @Override
+      public String apply() {
+        synchronized (Hadoop20YarnAMClient.this) {
+          String id = UUID.randomUUID().toString();
+
+          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+          for (int i = 0; i < count; i++) {
+            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
+                                                                                  priority, 1);
+            containerRequests.put(id, request);
+            amrmClient.addContainerRequest(request);
+          }
+
+          return id;
+        }
+      }
+    };
+  }
+
+  @Override
+  public synchronized void completeContainerRequest(String id) {
+    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+      amrmClient.removeContainerRequest(request);
+    }
+  }
+
+  private Resource adjustCapability(Resource resource) {
+    int cores = YarnUtils.getVirtualCores(resource);
+    int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
+                                YarnUtils.getVirtualCores(minCapability));
+    // Try and set the virtual cores, which older versions of YARN don't support this.
+    if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
+      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+    }
+
+    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+    int minMemory = minCapability.getMemory();
+    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+    if (resource.getMemory() != updatedMemory) {
+      resource.setMemory(updatedMemory);
+      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+    }
+
+    return resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
new file mode 100644
index 0000000..bfec34e
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationSubmitter;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAppClient extends AbstractIdleService implements YarnAppClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAppClient.class);
+  private final YarnClient yarnClient;
+  private String user;
+
+  public Hadoop20YarnAppClient(Configuration configuration) {
+    this.yarnClient = new YarnClientImpl();
+    yarnClient.init(configuration);
+    this.user = System.getProperty("user.name");
+  }
+
+  @Override
+  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
+    // Request for new application
+    final GetNewApplicationResponse response = yarnClient.getNewApplication();
+    final ApplicationId appId = response.getApplicationId();
+
+    // Setup the context for application submission
+    final ApplicationSubmissionContext appSubmissionContext = Records.newRecord(ApplicationSubmissionContext.class);
+    appSubmissionContext.setApplicationId(appId);
+    appSubmissionContext.setApplicationName(twillSpec.getName());
+    appSubmissionContext.setUser(user);
+
+    ApplicationSubmitter submitter = new ApplicationSubmitter() {
+
+      @Override
+      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) {
+        ContainerLaunchContext context = launchContext.getLaunchContext();
+        addRMToken(context);
+        context.setUser(appSubmissionContext.getUser());
+        context.setResource(adjustMemory(response, capability));
+        appSubmissionContext.setAMContainerSpec(context);
+
+        try {
+          yarnClient.submitApplication(appSubmissionContext);
+          return new ProcessControllerImpl(yarnClient, appId);
+        } catch (YarnRemoteException e) {
+          LOG.error("Failed to submit application {}", appId, e);
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+
+    return new ApplicationMasterProcessLauncher(appId, submitter);
+  }
+
+  private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
+    int minMemory = response.getMinimumResourceCapability().getMemory();
+
+    int updatedMemory = Math.min(capability.getMemory(), response.getMaximumResourceCapability().getMemory());
+    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+    if (updatedMemory != capability.getMemory()) {
+      capability.setMemory(updatedMemory);
+    }
+
+    return capability;
+  }
+
+  private void addRMToken(ContainerLaunchContext context) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+
+    try {
+      Credentials credentials = YarnUtils.decodeCredentials(context.getContainerTokens());
+
+      Configuration config = yarnClient.getConfig();
+      Token<TokenIdentifier> token = convertToken(
+        yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
+        YarnUtils.getRMAddress(config));
+
+      LOG.info("Added RM delegation token {}", token);
+      credentials.addToken(token.getService(), token);
+
+      context.setContainerTokens(YarnUtils.encodeCredentials(credentials));
+
+    } catch (Exception e) {
+      LOG.error("Fails to create credentials.", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private <T extends TokenIdentifier> Token<T> convertToken(DelegationToken protoToken, InetSocketAddress serviceAddr) {
+    Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
+                                  protoToken.getPassword().array(),
+                                  new Text(protoToken.getKind()),
+                                  new Text(protoToken.getService()));
+    if (serviceAddr != null) {
+      SecurityUtil.setTokenService(token, serviceAddr);
+    }
+    return token;
+  }
+
+  @Override
+  public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
+    this.user = user;
+    return createLauncher(twillSpec);
+  }
+
+  @Override
+  public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
+    return new ProcessControllerImpl(yarnClient, appId);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    yarnClient.start();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    yarnClient.stop();
+  }
+
+  private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
+    private final YarnClient yarnClient;
+    private final ApplicationId appId;
+
+    public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
+      this.yarnClient = yarnClient;
+      this.appId = appId;
+    }
+
+    @Override
+    public YarnApplicationReport getReport() {
+      try {
+        return new Hadoop20YarnApplicationReport(yarnClient.getApplicationReport(appId));
+      } catch (YarnRemoteException e) {
+        LOG.error("Failed to get application report {}", appId, e);
+        throw Throwables.propagate(e);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      try {
+        yarnClient.killApplication(appId);
+      } catch (YarnRemoteException e) {
+        LOG.error("Failed to kill application {}", appId, e);
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
new file mode 100644
index 0000000..6c1b764
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ *
+ */
+public final class Hadoop20YarnApplicationReport implements YarnApplicationReport {
+
+  private final ApplicationReport report;
+
+  public Hadoop20YarnApplicationReport(ApplicationReport report) {
+    this.report = report;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return report.getApplicationId();
+  }
+
+  @Override
+  public ApplicationAttemptId getCurrentApplicationAttemptId() {
+    return report.getCurrentApplicationAttemptId();
+  }
+
+  @Override
+  public String getQueue() {
+    return report.getQueue();
+  }
+
+  @Override
+  public String getName() {
+    return report.getName();
+  }
+
+  @Override
+  public String getHost() {
+    return report.getHost();
+  }
+
+  @Override
+  public int getRpcPort() {
+    return report.getRpcPort();
+  }
+
+  @Override
+  public YarnApplicationState getYarnApplicationState() {
+    return report.getYarnApplicationState();
+  }
+
+  @Override
+  public String getDiagnostics() {
+    return report.getDiagnostics();
+  }
+
+  @Override
+  public String getTrackingUrl() {
+    return report.getTrackingUrl();
+  }
+
+  @Override
+  public String getOriginalTrackingUrl() {
+    return report.getOriginalTrackingUrl();
+  }
+
+  @Override
+  public long getStartTime() {
+    return report.getStartTime();
+  }
+
+  @Override
+  public long getFinishTime() {
+    return report.getFinishTime();
+  }
+
+  @Override
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return report.getFinalApplicationStatus();
+  }
+
+  @Override
+  public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
+    return report.getApplicationResourceUsageReport();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
new file mode 100644
index 0000000..79b2cb5
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ *
+ */
+public final class Hadoop20YarnContainerInfo implements YarnContainerInfo {
+
+  private final Container container;
+
+  public Hadoop20YarnContainerInfo(Container container) {
+    this.container = container;
+  }
+
+  @Override
+  public <T> T getContainer() {
+    return (T) container;
+  }
+
+  @Override
+  public String getId() {
+    return container.getId().toString();
+  }
+
+  @Override
+  public InetAddress getHost() {
+    try {
+      return InetAddress.getByName(container.getNodeId().getHost());
+    } catch (UnknownHostException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public int getPort() {
+    return container.getNodeId().getPort();
+  }
+
+  @Override
+  public int getMemoryMB() {
+    return container.getResource().getMemory();
+  }
+
+  @Override
+  public int getVirtualCores() {
+    return YarnUtils.getVirtualCores(container.getResource());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
new file mode 100644
index 0000000..cc61856
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ *
+ */
+public final class Hadoop20YarnContainerStatus implements YarnContainerStatus {
+
+  private final ContainerStatus containerStatus;
+
+  public Hadoop20YarnContainerStatus(ContainerStatus containerStatus) {
+    this.containerStatus = containerStatus;
+  }
+
+  @Override
+  public String getContainerId() {
+    return containerStatus.getContainerId().toString();
+  }
+
+  @Override
+  public ContainerState getState() {
+    return containerStatus.getState();
+  }
+
+  @Override
+  public int getExitStatus() {
+    return containerStatus.getExitStatus();
+  }
+
+  @Override
+  public String getDiagnostics() {
+    return containerStatus.getDiagnostics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
new file mode 100644
index 0000000..b1f6d66
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class Hadoop20YarnLaunchContext implements YarnLaunchContext {
+
+  private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
+
+  static {
+    // Creates transform function from YarnLocalResource -> LocalResource
+    RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
+      @Override
+      public LocalResource apply(YarnLocalResource input) {
+        return input.getLocalResource();
+      }
+    };
+  }
+
+  private final ContainerLaunchContext launchContext;
+
+  public Hadoop20YarnLaunchContext() {
+    launchContext = Records.newRecord(ContainerLaunchContext.class);
+  }
+
+  @Override
+  public <T> T getLaunchContext() {
+    return (T) launchContext;
+  }
+
+  @Override
+  public void setCredentials(Credentials credentials) {
+    launchContext.setContainerTokens(YarnUtils.encodeCredentials(credentials));
+  }
+
+  @Override
+  public void setLocalResources(Map<String, YarnLocalResource> localResources) {
+    launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
+  }
+
+  @Override
+  public void setServiceData(Map<String, ByteBuffer> serviceData) {
+    launchContext.setServiceData(serviceData);
+  }
+
+  @Override
+  public Map<String, String> getEnvironment() {
+    return launchContext.getEnvironment();
+  }
+
+  @Override
+  public void setEnvironment(Map<String, String> environment) {
+    launchContext.setEnvironment(environment);
+  }
+
+  @Override
+  public List<String> getCommands() {
+    return launchContext.getCommands();
+  }
+
+  @Override
+  public void setCommands(List<String> commands) {
+    launchContext.setCommands(commands);
+  }
+
+  @Override
+  public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
+    launchContext.setApplicationACLs(acls);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
new file mode 100644
index 0000000..b327b94
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ */
+public final class Hadoop20YarnLocalResource implements YarnLocalResource {
+
+  private final LocalResource localResource;
+
+  public Hadoop20YarnLocalResource() {
+    this.localResource = Records.newRecord(LocalResource.class);
+  }
+
+  @Override
+  public <T> T getLocalResource() {
+    return (T) localResource;
+  }
+
+  @Override
+  public URL getResource() {
+    return localResource.getResource();
+  }
+
+  @Override
+  public void setResource(URL resource) {
+    localResource.setResource(resource);
+  }
+
+  @Override
+  public long getSize() {
+    return localResource.getSize();
+  }
+
+  @Override
+  public void setSize(long size) {
+    localResource.setSize(size);
+  }
+
+  @Override
+  public long getTimestamp() {
+    return localResource.getTimestamp();
+  }
+
+  @Override
+  public void setTimestamp(long timestamp) {
+    localResource.setTimestamp(timestamp);
+  }
+
+  @Override
+  public LocalResourceType getType() {
+    return localResource.getType();
+  }
+
+  @Override
+  public void setType(LocalResourceType type) {
+    localResource.setType(type);
+  }
+
+  @Override
+  public LocalResourceVisibility getVisibility() {
+    return localResource.getVisibility();
+  }
+
+  @Override
+  public void setVisibility(LocalResourceVisibility visibility) {
+    localResource.setVisibility(visibility);
+  }
+
+  @Override
+  public String getPattern() {
+    return localResource.getPattern();
+  }
+
+  @Override
+  public void setPattern(String pattern) {
+    localResource.setPattern(pattern);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
new file mode 100644
index 0000000..98ecc67
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class Hadoop20YarnNMClient implements YarnNMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnNMClient.class);
+
+  private final YarnRPC yarnRPC;
+  private final Configuration yarnConf;
+
+  public Hadoop20YarnNMClient(YarnRPC yarnRPC, Configuration yarnConf) {
+    this.yarnRPC = yarnRPC;
+    this.yarnConf = yarnConf;
+  }
+
+  @Override
+  public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
+    ContainerLaunchContext context = launchContext.getLaunchContext();
+    context.setUser(System.getProperty("user.name"));
+
+    Container container = containerInfo.getContainer();
+
+    context.setContainerId(container.getId());
+    context.setResource(container.getResource());
+
+    StartContainerRequest startRequest = Records.newRecord(StartContainerRequest.class);
+    startRequest.setContainerLaunchContext(context);
+
+    ContainerManager manager = connectContainerManager(container);
+    try {
+      manager.startContainer(startRequest);
+      return new ContainerTerminator(container, manager);
+    } catch (YarnRemoteException e) {
+      LOG.error("Error in launching process", e);
+      throw Throwables.propagate(e);
+    }
+
+  }
+
+  /**
+   * Helper to connect to container manager (node manager).
+   */
+  private ContainerManager connectContainerManager(Container container) {
+    String cmIpPortStr = String.format("%s:%d", container.getNodeId().getHost(), container.getNodeId().getPort());
+    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+    return ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddress, yarnConf));
+  }
+
+  private static final class ContainerTerminator implements Cancellable {
+
+    private final Container container;
+    private final ContainerManager manager;
+
+    private ContainerTerminator(Container container, ContainerManager manager) {
+      this.container = container;
+      this.manager = manager;
+    }
+
+    @Override
+    public void cancel() {
+      LOG.info("Request to stop container {}.", container.getId());
+      StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
+      stopRequest.setContainerId(container.getId());
+      try {
+        manager.stopContainer(stopRequest);
+        boolean completed = false;
+        while (!completed) {
+          GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class);
+          statusRequest.setContainerId(container.getId());
+          GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest);
+          LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
+
+          completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE);
+        }
+        LOG.info("Container {} stopped.", container.getId());
+      } catch (YarnRemoteException e) {
+        LOG.error("Fail to stop container {}", container.getId(), e);
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
new file mode 100644
index 0000000..26b6fa2
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+/**
+ * Ported from Apache Hadoop YARN.
+ */
+public interface AMRMClient extends Service {
+
+  /**
+   * Value used to define no locality.
+   */
+  static final String ANY = "*";
+
+  /**
+   * Object to represent container request for resources.
+   * Resources may be localized to nodes and racks.
+   * Resources may be assigned priorities.
+   * Can ask for multiple containers of a given type.
+   */
+  public static class ContainerRequest {
+    Resource capability;
+    String[] hosts;
+    String[] racks;
+    Priority priority;
+    int containerCount;
+
+    public ContainerRequest(Resource capability, String[] hosts,
+                            String[] racks, Priority priority, int containerCount) {
+      this.capability = capability;
+      this.hosts = (hosts != null ? hosts.clone() : null);
+      this.racks = (racks != null ? racks.clone() : null);
+      this.priority = priority;
+      this.containerCount = containerCount;
+    }
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Capability[").append(capability).append("]");
+      sb.append("Priority[").append(priority).append("]");
+      sb.append("ContainerCount[").append(containerCount).append("]");
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Register the application master. This must be called before any
+   * other interaction
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @return <code>RegisterApplicationMasterResponse</code>
+   * @throws org.apache.hadoop.yarn.exceptions.YarnRemoteException
+   */
+  public RegisterApplicationMasterResponse
+  registerApplicationMaster(String appHostName,
+                            int appHostPort,
+                            String appTrackingUrl)
+    throws YarnRemoteException;
+
+  /**
+   * Request additional containers and receive new container allocations.
+   * Requests made via <code>addContainerRequest</code> are sent to the
+   * <code>ResourceManager</code>. New containers assigned to the master are
+   * retrieved. Status of completed containers and node health updates are
+   * also retrieved.
+   * This also doubles as a heartbeat to the ResourceManager and must be
+   * made periodically.
+   * The call may not always return any new allocations of containers.
+   * App should not make concurrent allocate requests. May cause request loss.
+   * @param progressIndicator Indicates progress made by the master
+   * @return the response of the allocate request
+   * @throws YarnRemoteException
+   */
+  public AllocationResponse allocate(float progressIndicator)
+    throws YarnRemoteException;
+
+  /**
+   * Unregister the Application Master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnRemoteException
+   */
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+                                          String appMessage,
+                                          String appTrackingUrl)
+    throws YarnRemoteException;
+
+  /**
+   * Request containers for resources before calling <code>allocate</code>.
+   * @param req Resource request
+   */
+  public void addContainerRequest(ContainerRequest req);
+
+  /**
+   * Remove previous container request. The previous container request may have
+   * already been sent to the ResourceManager. So even after the remove request
+   * the app must be prepared to receive an allocation for the previous request
+   * even after the remove request
+   * @param req Resource request
+   */
+  public void removeContainerRequest(ContainerRequest req);
+
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release it.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. For example, if it released non-local resources
+   * @param containerId
+   */
+  public void releaseAssignedContainer(ContainerId containerId);
+
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public Resource getClusterAvailableResources();
+
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public int getClusterNodeCount();
+}