You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/11/21 22:54:31 UTC
[08/15] Initial import commit.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..e6f0522
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,556 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.twill</groupId>
+ <artifactId>twill-parent</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>Twill library parent project</name>
+
+ <modules>
+ <module>common</module>
+ <module>discovery-api</module>
+ <module>api</module>
+ <module>zookeeper</module>
+ <module>discovery-core</module>
+ <module>core</module>
+ <module>yarn</module>
+ </modules>
+
+ <repositories>
+ <repository>
+ <id>cloudera-releases</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
+ <hadoop.version>[2.0.2-alpha,2.2.0]</hadoop.version>
+ <hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
+ </properties>
+
+ <scm>
+ <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-twill.git</connection>
+ <url>https://git-wip-us.apache.org/repos/asf?p=incubator-twill.git;a=summary</url>
+ <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-twill.git</developerConnection>
+ </scm>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <configuration>
+ <excludeResources>true</excludeResources>
+ </configuration>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9</version>
+ <configuration>
+ <excludePackageNames>*.internal.*</excludePackageNames>
+ <links>
+ <link>http://download.oracle.com/javase/6/docs/api/</link>
+ </links>
+ <bottom>
+ <![CDATA[Copyright © 2013 <a href="http://www.apache.org">The Apache Software Foundation</a>. All rights reserved.]]>
+ </bottom>
+ </configuration>
+ <executions>
+ <execution>
+ <id>attach-javadoc</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-release-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <autoVersionSubmodules>true</autoVersionSubmodules>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.2</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.14.1</version>
+ <configuration>
+ <argLine>-Xmx512m</argLine>
+ <redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
+ <systemPropertyVariables>
+ <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+ </systemPropertyVariables>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/Test*.java</include>
+ <include>**/*Test.java</include>
+ <include>**/*TestCase.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <deployAtEnd>true</deployAtEnd>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop-2.0</id>
+ <properties>
+ <hadoop.version>2.0.2-alpha</hadoop.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/hadoop20</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>cdh-4.4.0</id>
+ <properties>
+ <hadoop.version>2.0.0-cdh4.4.0</hadoop.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/hadoop20</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop-2.1</id>
+ <properties>
+ <hadoop.version>2.1.0-beta</hadoop.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/hadoop21</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop-2.2</id>
+ <properties>
+ <hadoop.version>2.2.0</hadoop.version>
+ </properties>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/hadoop21</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ </profiles>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>13.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>2.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.2.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>junit</artifactId>
+ <groupId>junit</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.6.6.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.4.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>1.7.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-all</artifactId>
+ <version>4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jasper-compiler</artifactId>
+ <groupId>tomcat</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jasper-runtime</artifactId>
+ <groupId>tomcat</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-daemon</groupId>
+ <artifactId>commons-daemon</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.5</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.14.1</version>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
new file mode 100644
index 0000000..382d104
--- /dev/null
+++ b/yarn/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>1.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-yarn</artifactId>
+ <name>Twill Apache Hadoop YARN library</name>
+
+ <properties>
+ <output.dir>target/classes</output.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-discovery-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>${output.dir}</outputDirectory>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop-2.0</id>
+ <properties>
+ <output.dir>${hadoop20.output.dir}</output.dir>
+ </properties>
+ </profile>
+ <profile>
+ <id>hadoop-2.1</id>
+ <build>
+ <resources>
+ <resource>
+ <directory>${hadoop20.output.dir}</directory>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop-2.2</id>
+ <build>
+ <resources>
+ <resource>
+ <directory>${hadoop20.output.dir}</directory>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
new file mode 100644
index 0000000..d98dee1
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import org.apache.twill.internal.yarn.ports.AMRMClient;
+import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
+import org.apache.twill.internal.yarn.ports.AllocationResponse;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
+ private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+ static {
+ STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+ @Override
+ public YarnContainerStatus apply(ContainerStatus status) {
+ return new Hadoop20YarnContainerStatus(status);
+ }
+ };
+ }
+
+ private final ContainerId containerId;
+ private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+ private final AMRMClient amrmClient;
+ private final YarnNMClient nmClient;
+ private InetSocketAddress trackerAddr;
+ private URL trackerUrl;
+ private Resource maxCapability;
+ private Resource minCapability;
+
+ public Hadoop20YarnAMClient(Configuration conf) {
+ String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ Preconditions.checkArgument(masterContainerId != null,
+ "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
+ this.containerId = ConverterUtils.toContainerId(masterContainerId);
+ this.containerRequests = ArrayListMultimap.create();
+
+ this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
+ this.amrmClient.init(conf);
+ this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+ Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+ amrmClient.start();
+
+ RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+ trackerAddr.getPort(),
+ trackerUrl.toString());
+ maxCapability = response.getMaximumResourceCapability();
+ minCapability = response.getMinimumResourceCapability();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+ amrmClient.stop();
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ @Override
+ public String getHost() {
+ return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
+ }
+
+ @Override
+ public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+ this.trackerAddr = trackerAddr;
+ this.trackerUrl = trackerUrl;
+ }
+
+ @Override
+ public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+ AllocationResponse response = amrmClient.allocate(progress);
+ List<ProcessLauncher<YarnContainerInfo>> launchers
+ = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
+
+ for (Container container : response.getAllocatedContainers()) {
+ launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
+ }
+
+ if (!launchers.isEmpty()) {
+ handler.acquired(launchers);
+
+ // If no process has been launched through the given launcher, return the container.
+ for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+ // This cast always works.
+ RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+ if (!launcher.isLaunched()) {
+ Container container = launcher.getContainerInfo().getContainer();
+ LOG.info("Nothing to run in container, releasing it: {}", container);
+ amrmClient.releaseAssignedContainer(container.getId());
+ }
+ }
+ }
+
+ List<YarnContainerStatus> completed = ImmutableList.copyOf(
+ Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+ if (!completed.isEmpty()) {
+ handler.completed(completed);
+ }
+ }
+
+ @Override
+ public ContainerRequestBuilder addContainerRequest(Resource capability) {
+ return addContainerRequest(capability, 1);
+ }
+
+ @Override
+ public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+ return new ContainerRequestBuilder(adjustCapability(capability), count) {
+ @Override
+ public String apply() {
+ synchronized (Hadoop20YarnAMClient.this) {
+ String id = UUID.randomUUID().toString();
+
+ String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+ String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+ for (int i = 0; i < count; i++) {
+ AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
+ priority, 1);
+ containerRequests.put(id, request);
+ amrmClient.addContainerRequest(request);
+ }
+
+ return id;
+ }
+ }
+ };
+ }
+
+ @Override
+ public synchronized void completeContainerRequest(String id) {
+ for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+ amrmClient.removeContainerRequest(request);
+ }
+ }
+
+ private Resource adjustCapability(Resource resource) {
+ int cores = YarnUtils.getVirtualCores(resource);
+ int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
+ YarnUtils.getVirtualCores(minCapability));
+ // Try and set the virtual cores, which older versions of YARN don't support this.
+ if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
+ LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+ }
+
+ int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+ int minMemory = minCapability.getMemory();
+ updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+ if (resource.getMemory() != updatedMemory) {
+ resource.setMemory(updatedMemory);
+ LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+ }
+
+ return resource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
new file mode 100644
index 0000000..bfec34e
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationSubmitter;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAppClient extends AbstractIdleService implements YarnAppClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAppClient.class);
+ private final YarnClient yarnClient;
+ private String user;
+
+ public Hadoop20YarnAppClient(Configuration configuration) {
+ this.yarnClient = new YarnClientImpl();
+ yarnClient.init(configuration);
+ this.user = System.getProperty("user.name");
+ }
+
+ @Override
+ public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
+ // Request for new application
+ final GetNewApplicationResponse response = yarnClient.getNewApplication();
+ final ApplicationId appId = response.getApplicationId();
+
+ // Setup the context for application submission
+ final ApplicationSubmissionContext appSubmissionContext = Records.newRecord(ApplicationSubmissionContext.class);
+ appSubmissionContext.setApplicationId(appId);
+ appSubmissionContext.setApplicationName(twillSpec.getName());
+ appSubmissionContext.setUser(user);
+
+ ApplicationSubmitter submitter = new ApplicationSubmitter() {
+
+ @Override
+ public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) {
+ ContainerLaunchContext context = launchContext.getLaunchContext();
+ addRMToken(context);
+ context.setUser(appSubmissionContext.getUser());
+ context.setResource(adjustMemory(response, capability));
+ appSubmissionContext.setAMContainerSpec(context);
+
+ try {
+ yarnClient.submitApplication(appSubmissionContext);
+ return new ProcessControllerImpl(yarnClient, appId);
+ } catch (YarnRemoteException e) {
+ LOG.error("Failed to submit application {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+ };
+
+ return new ApplicationMasterProcessLauncher(appId, submitter);
+ }
+
+ private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
+ int minMemory = response.getMinimumResourceCapability().getMemory();
+
+ int updatedMemory = Math.min(capability.getMemory(), response.getMaximumResourceCapability().getMemory());
+ updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+ if (updatedMemory != capability.getMemory()) {
+ capability.setMemory(updatedMemory);
+ }
+
+ return capability;
+ }
+
+ private void addRMToken(ContainerLaunchContext context) {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ try {
+ Credentials credentials = YarnUtils.decodeCredentials(context.getContainerTokens());
+
+ Configuration config = yarnClient.getConfig();
+ Token<TokenIdentifier> token = convertToken(
+ yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
+ YarnUtils.getRMAddress(config));
+
+ LOG.info("Added RM delegation token {}", token);
+ credentials.addToken(token.getService(), token);
+
+ context.setContainerTokens(YarnUtils.encodeCredentials(credentials));
+
+ } catch (Exception e) {
+ LOG.error("Fails to create credentials.", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private <T extends TokenIdentifier> Token<T> convertToken(DelegationToken protoToken, InetSocketAddress serviceAddr) {
+ Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
+ protoToken.getPassword().array(),
+ new Text(protoToken.getKind()),
+ new Text(protoToken.getService()));
+ if (serviceAddr != null) {
+ SecurityUtil.setTokenService(token, serviceAddr);
+ }
+ return token;
+ }
+
+ @Override
+ public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
+ this.user = user;
+ return createLauncher(twillSpec);
+ }
+
+ @Override
+ public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
+ return new ProcessControllerImpl(yarnClient, appId);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ yarnClient.start();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ yarnClient.stop();
+ }
+
+ private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
+ private final YarnClient yarnClient;
+ private final ApplicationId appId;
+
+ public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
+ this.yarnClient = yarnClient;
+ this.appId = appId;
+ }
+
+ @Override
+ public YarnApplicationReport getReport() {
+ try {
+ return new Hadoop20YarnApplicationReport(yarnClient.getApplicationReport(appId));
+ } catch (YarnRemoteException e) {
+ LOG.error("Failed to get application report {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ try {
+ yarnClient.killApplication(appId);
+ } catch (YarnRemoteException e) {
+ LOG.error("Failed to kill application {}", appId, e);
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
new file mode 100644
index 0000000..6c1b764
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ *
+ */
+public final class Hadoop20YarnApplicationReport implements YarnApplicationReport {
+
+ private final ApplicationReport report;
+
+ public Hadoop20YarnApplicationReport(ApplicationReport report) {
+ this.report = report;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ return report.getApplicationId();
+ }
+
+ @Override
+ public ApplicationAttemptId getCurrentApplicationAttemptId() {
+ return report.getCurrentApplicationAttemptId();
+ }
+
+ @Override
+ public String getQueue() {
+ return report.getQueue();
+ }
+
+ @Override
+ public String getName() {
+ return report.getName();
+ }
+
+ @Override
+ public String getHost() {
+ return report.getHost();
+ }
+
+ @Override
+ public int getRpcPort() {
+ return report.getRpcPort();
+ }
+
+ @Override
+ public YarnApplicationState getYarnApplicationState() {
+ return report.getYarnApplicationState();
+ }
+
+ @Override
+ public String getDiagnostics() {
+ return report.getDiagnostics();
+ }
+
+ @Override
+ public String getTrackingUrl() {
+ return report.getTrackingUrl();
+ }
+
+ @Override
+ public String getOriginalTrackingUrl() {
+ return report.getOriginalTrackingUrl();
+ }
+
+ @Override
+ public long getStartTime() {
+ return report.getStartTime();
+ }
+
+ @Override
+ public long getFinishTime() {
+ return report.getFinishTime();
+ }
+
+ @Override
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ return report.getFinalApplicationStatus();
+ }
+
+ @Override
+ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
+ return report.getApplicationResourceUsageReport();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
new file mode 100644
index 0000000..79b2cb5
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ *
+ */
+public final class Hadoop20YarnContainerInfo implements YarnContainerInfo {
+
+ private final Container container;
+
+ public Hadoop20YarnContainerInfo(Container container) {
+ this.container = container;
+ }
+
+ @Override
+ public <T> T getContainer() {
+ return (T) container;
+ }
+
+ @Override
+ public String getId() {
+ return container.getId().toString();
+ }
+
+ @Override
+ public InetAddress getHost() {
+ try {
+ return InetAddress.getByName(container.getNodeId().getHost());
+ } catch (UnknownHostException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public int getPort() {
+ return container.getNodeId().getPort();
+ }
+
+ @Override
+ public int getMemoryMB() {
+ return container.getResource().getMemory();
+ }
+
+ @Override
+ public int getVirtualCores() {
+ return YarnUtils.getVirtualCores(container.getResource());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
new file mode 100644
index 0000000..cc61856
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ *
+ */
+public final class Hadoop20YarnContainerStatus implements YarnContainerStatus {
+
+ private final ContainerStatus containerStatus;
+
+ public Hadoop20YarnContainerStatus(ContainerStatus containerStatus) {
+ this.containerStatus = containerStatus;
+ }
+
+ @Override
+ public String getContainerId() {
+ return containerStatus.getContainerId().toString();
+ }
+
+ @Override
+ public ContainerState getState() {
+ return containerStatus.getState();
+ }
+
+ @Override
+ public int getExitStatus() {
+ return containerStatus.getExitStatus();
+ }
+
+ @Override
+ public String getDiagnostics() {
+ return containerStatus.getDiagnostics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
new file mode 100644
index 0000000..b1f6d66
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class Hadoop20YarnLaunchContext implements YarnLaunchContext {
+
+ private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
+
+ static {
+ // Creates transform function from YarnLocalResource -> LocalResource
+ RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
+ @Override
+ public LocalResource apply(YarnLocalResource input) {
+ return input.getLocalResource();
+ }
+ };
+ }
+
+ private final ContainerLaunchContext launchContext;
+
+ public Hadoop20YarnLaunchContext() {
+ launchContext = Records.newRecord(ContainerLaunchContext.class);
+ }
+
+ @Override
+ public <T> T getLaunchContext() {
+ return (T) launchContext;
+ }
+
+ @Override
+ public void setCredentials(Credentials credentials) {
+ launchContext.setContainerTokens(YarnUtils.encodeCredentials(credentials));
+ }
+
+ @Override
+ public void setLocalResources(Map<String, YarnLocalResource> localResources) {
+ launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
+ }
+
+ @Override
+ public void setServiceData(Map<String, ByteBuffer> serviceData) {
+ launchContext.setServiceData(serviceData);
+ }
+
+ @Override
+ public Map<String, String> getEnvironment() {
+ return launchContext.getEnvironment();
+ }
+
+ @Override
+ public void setEnvironment(Map<String, String> environment) {
+ launchContext.setEnvironment(environment);
+ }
+
+ @Override
+ public List<String> getCommands() {
+ return launchContext.getCommands();
+ }
+
+ @Override
+ public void setCommands(List<String> commands) {
+ launchContext.setCommands(commands);
+ }
+
+ @Override
+ public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
+ launchContext.setApplicationACLs(acls);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
new file mode 100644
index 0000000..b327b94
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ */
+public final class Hadoop20YarnLocalResource implements YarnLocalResource {
+
+ private final LocalResource localResource;
+
+ public Hadoop20YarnLocalResource() {
+ this.localResource = Records.newRecord(LocalResource.class);
+ }
+
+ @Override
+ public <T> T getLocalResource() {
+ return (T) localResource;
+ }
+
+ @Override
+ public URL getResource() {
+ return localResource.getResource();
+ }
+
+ @Override
+ public void setResource(URL resource) {
+ localResource.setResource(resource);
+ }
+
+ @Override
+ public long getSize() {
+ return localResource.getSize();
+ }
+
+ @Override
+ public void setSize(long size) {
+ localResource.setSize(size);
+ }
+
+ @Override
+ public long getTimestamp() {
+ return localResource.getTimestamp();
+ }
+
+ @Override
+ public void setTimestamp(long timestamp) {
+ localResource.setTimestamp(timestamp);
+ }
+
+ @Override
+ public LocalResourceType getType() {
+ return localResource.getType();
+ }
+
+ @Override
+ public void setType(LocalResourceType type) {
+ localResource.setType(type);
+ }
+
+ @Override
+ public LocalResourceVisibility getVisibility() {
+ return localResource.getVisibility();
+ }
+
+ @Override
+ public void setVisibility(LocalResourceVisibility visibility) {
+ localResource.setVisibility(visibility);
+ }
+
+ @Override
+ public String getPattern() {
+ return localResource.getPattern();
+ }
+
+ @Override
+ public void setPattern(String pattern) {
+ localResource.setPattern(pattern);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
new file mode 100644
index 0000000..98ecc67
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class Hadoop20YarnNMClient implements YarnNMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnNMClient.class);
+
+ private final YarnRPC yarnRPC;
+ private final Configuration yarnConf;
+
+ public Hadoop20YarnNMClient(YarnRPC yarnRPC, Configuration yarnConf) {
+ this.yarnRPC = yarnRPC;
+ this.yarnConf = yarnConf;
+ }
+
+ @Override
+ public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
+ ContainerLaunchContext context = launchContext.getLaunchContext();
+ context.setUser(System.getProperty("user.name"));
+
+ Container container = containerInfo.getContainer();
+
+ context.setContainerId(container.getId());
+ context.setResource(container.getResource());
+
+ StartContainerRequest startRequest = Records.newRecord(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(context);
+
+ ContainerManager manager = connectContainerManager(container);
+ try {
+ manager.startContainer(startRequest);
+ return new ContainerTerminator(container, manager);
+ } catch (YarnRemoteException e) {
+ LOG.error("Error in launching process", e);
+ throw Throwables.propagate(e);
+ }
+
+ }
+
+ /**
+ * Helper to connect to container manager (node manager).
+ */
+ private ContainerManager connectContainerManager(Container container) {
+ String cmIpPortStr = String.format("%s:%d", container.getNodeId().getHost(), container.getNodeId().getPort());
+ InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+ return ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddress, yarnConf));
+ }
+
+ private static final class ContainerTerminator implements Cancellable {
+
+ private final Container container;
+ private final ContainerManager manager;
+
+ private ContainerTerminator(Container container, ContainerManager manager) {
+ this.container = container;
+ this.manager = manager;
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Request to stop container {}.", container.getId());
+ StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
+ stopRequest.setContainerId(container.getId());
+ try {
+ manager.stopContainer(stopRequest);
+ boolean completed = false;
+ while (!completed) {
+ GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class);
+ statusRequest.setContainerId(container.getId());
+ GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest);
+ LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
+
+ completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE);
+ }
+ LOG.info("Container {} stopped.", container.getId());
+ } catch (YarnRemoteException e) {
+ LOG.error("Fail to stop container {}", container.getId(), e);
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
new file mode 100644
index 0000000..26b6fa2
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+/**
+ * Ported from Apache Hadoop YARN.
+ */
+public interface AMRMClient extends Service {
+
+ /**
+ * Value used to define no locality.
+ */
+ static final String ANY = "*";
+
+ /**
+ * Object to represent container request for resources.
+ * Resources may be localized to nodes and racks.
+ * Resources may be assigned priorities.
+ * Can ask for multiple containers of a given type.
+ */
+ public static class ContainerRequest {
+ Resource capability;
+ String[] hosts;
+ String[] racks;
+ Priority priority;
+ int containerCount;
+
+ public ContainerRequest(Resource capability, String[] hosts,
+ String[] racks, Priority priority, int containerCount) {
+ this.capability = capability;
+ this.hosts = (hosts != null ? hosts.clone() : null);
+ this.racks = (racks != null ? racks.clone() : null);
+ this.priority = priority;
+ this.containerCount = containerCount;
+ }
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Capability[").append(capability).append("]");
+ sb.append("Priority[").append(priority).append("]");
+ sb.append("ContainerCount[").append(containerCount).append("]");
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Register the application master. This must be called before any
+ * other interaction
+ * @param appHostName Name of the host on which master is running
+ * @param appHostPort Port master is listening on
+ * @param appTrackingUrl URL at which the master info can be seen
+ * @return <code>RegisterApplicationMasterResponse</code>
+ * @throws org.apache.hadoop.yarn.exceptions.YarnRemoteException
+ */
+ public RegisterApplicationMasterResponse
+ registerApplicationMaster(String appHostName,
+ int appHostPort,
+ String appTrackingUrl)
+ throws YarnRemoteException;
+
+ /**
+ * Request additional containers and receive new container allocations.
+ * Requests made via <code>addContainerRequest</code> are sent to the
+ * <code>ResourceManager</code>. New containers assigned to the master are
+ * retrieved. Status of completed containers and node health updates are
+ * also retrieved.
+ * This also doubles as a heartbeat to the ResourceManager and must be
+ * made periodically.
+ * The call may not always return any new allocations of containers.
+ * App should not make concurrent allocate requests. May cause request loss.
+ * @param progressIndicator Indicates progress made by the master
+ * @return the response of the allocate request
+ * @throws YarnRemoteException
+ */
+ public AllocationResponse allocate(float progressIndicator)
+ throws YarnRemoteException;
+
+ /**
+ * Unregister the Application Master. This must be called in the end.
+ * @param appStatus Success/Failure status of the master
+ * @param appMessage Diagnostics message on failure
+ * @param appTrackingUrl New URL to get master info
+ * @throws YarnRemoteException
+ */
+ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage,
+ String appTrackingUrl)
+ throws YarnRemoteException;
+
+ /**
+ * Request containers for resources before calling <code>allocate</code>.
+ * @param req Resource request
+ */
+ public void addContainerRequest(ContainerRequest req);
+
+ /**
+ * Remove previous container request. The previous container request may have
+ * already been sent to the ResourceManager. So even after the remove request
+ * the app must be prepared to receive an allocation for the previous request
+ * even after the remove request
+ * @param req Resource request
+ */
+ public void removeContainerRequest(ContainerRequest req);
+
+ /**
+ * Release containers assigned by the Resource Manager. If the app cannot use
+ * the container or wants to give up the container then it can release it.
+ * The app needs to make new requests for the released resource capability if
+ * it still needs it. For example, if it released non-local resources
+ * @param containerId
+ */
+ public void releaseAssignedContainer(ContainerId containerId);
+
+ /**
+ * Get the currently available resources in the cluster.
+ * A valid value is available after a call to allocate has been made
+ * @return Currently available resources
+ */
+ public Resource getClusterAvailableResources();
+
+ /**
+ * Get the current number of nodes in the cluster.
+ * A valid values is available after a call to allocate has been made
+ * @return Current number of nodes in the cluster
+ */
+ public int getClusterNodeCount();
+}