You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:31 UTC

[15/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
new file mode 100644
index 0000000..508cadb
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.ApplicationBundler;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ *
+ */
+public class ApplicationBundlerTest {
+
+  @Rule
+  public TemporaryFolder tmpDir = new TemporaryFolder();
+
+  @Test
+  public void testFindDependencies() throws IOException, ClassNotFoundException {
+    Location location = new LocalLocationFactory(tmpDir.newFolder()).create("test.jar");
+
+    // Create a jar file with by tracing dependency
+    ApplicationBundler bundler = new ApplicationBundler(ImmutableList.<String>of());
+    bundler.createBundle(location, ApplicationBundler.class);
+
+    File targetDir = tmpDir.newFolder();
+    unjar(new File(location.toURI()), targetDir);
+
+    // Load the class back, it should be loaded by the custom classloader
+    ClassLoader classLoader = createClassLoader(targetDir);
+    Class<?> clz = classLoader.loadClass(ApplicationBundler.class.getName());
+    Assert.assertSame(classLoader, clz.getClassLoader());
+
+    // For system classes, they shouldn't be packaged, hence loaded by different classloader.
+    clz = classLoader.loadClass(Object.class.getName());
+    Assert.assertNotSame(classLoader, clz.getClassLoader());
+  }
+
+  private void unjar(File jarFile, File targetDir) throws IOException {
+    JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
+    try {
+      JarEntry jarEntry = jarInput.getNextJarEntry();
+      while (jarEntry != null) {
+        File target = new File(targetDir, jarEntry.getName());
+        if (jarEntry.isDirectory()) {
+          target.mkdirs();
+        } else {
+          target.getParentFile().mkdirs();
+          ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target));
+        }
+
+        jarEntry = jarInput.getNextJarEntry();
+      }
+    } finally {
+      jarInput.close();
+    }
+  }
+
+  private ClassLoader createClassLoader(File dir) throws MalformedURLException {
+    List<URL> urls = Lists.newArrayList();
+    urls.add(new File(dir, "classes").toURI().toURL());
+    File[] libFiles = new File(dir, "lib").listFiles();
+    if (libFiles != null) {
+      for (File file : libFiles) {
+        urls.add(file.toURI().toURL());
+      }
+    }
+    return new URLClassLoader(urls.toArray(new URL[0])) {
+      @Override
+      protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        // Load class from the given URLs first before delegating to parent.
+        try {
+          return super.findClass(name);
+        } catch (ClassNotFoundException e) {
+          ClassLoader parent = getParent();
+          return parent == null ? ClassLoader.getSystemClassLoader().loadClass(name) : parent.loadClass(name);
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
new file mode 100644
index 0000000..40fc3ed
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.kafka.client.Compression;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class KafkaTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
+
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+  private static InMemoryZKServer zkServer;
+  private static EmbeddedKafkaServer kafkaServer;
+  private static ZKClientService zkClientService;
+  private static KafkaClient kafkaClient;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
+    zkServer.startAndWait();
+
+    // Extract the kafka.tgz and start the kafka server
+    kafkaServer = new EmbeddedKafkaServer(extractKafka(), generateKafkaConfig(zkServer.getConnectionStr()));
+    kafkaServer.startAndWait();
+
+    zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+
+    kafkaClient = new SimpleKafkaClient(zkClientService);
+    Services.chainStart(zkClientService, kafkaClient).get();
+  }
+
+  @AfterClass
+  public static void finish() throws Exception {
+    Services.chainStop(kafkaClient, zkClientService).get();
+    kafkaServer.stopAndWait();
+    zkServer.stopAndWait();
+  }
+
+  @Test
+  public void testKafkaClient() throws Exception {
+    String topic = "testClient";
+
+    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
+    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10);
+
+    t1.start();
+    t2.start();
+
+    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10);
+    t2.join();
+    t3.start();
+
+    Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, 0, 1048576);
+    int count = 0;
+    long startTime = System.nanoTime();
+    while (count < 30 && consumer.hasNext() && secondsPassed(startTime, TimeUnit.NANOSECONDS) < 5) {
+      LOG.info(Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+      count++;
+    }
+
+    Assert.assertEquals(30, count);
+  }
+
+  @Test (timeout = 10000)
+  public void testOffset() throws Exception {
+    String topic = "testOffset";
+
+    // Initial earliest offset should be 0.
+    long[] offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
+    Assert.assertArrayEquals(new long[]{0L}, offsets);
+
+    // Publish some messages
+    Thread publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 2000);
+    publishThread.start();
+    publishThread.join();
+
+    // Fetch earliest offset, should still be 0.
+    offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
+    Assert.assertArrayEquals(new long[]{0L}, offsets);
+
+    // Fetch latest offset
+    offsets = kafkaClient.getOffset(topic, 0, -1, 10).get();
+    Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, offsets[0], 1048576);
+
+    // Publish one more message, the consumer should see the new message being published.
+    publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 1, 3000);
+    publishThread.start();
+    publishThread.join();
+
+    // Should see the last message being published.
+    Assert.assertTrue(consumer.hasNext());
+    Assert.assertEquals("3000 Testing", Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+  }
+
+  private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
+                                     final Compression compression, final String message, final int count) {
+    return createPublishThread(kafkaClient, topic, compression, message, count, 0);
+  }
+
+  private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
+                                     final String message, final int count, final int base) {
+    return new Thread() {
+      public void run() {
+        PreparePublish preparePublish = kafkaClient.preparePublish(topic, compression);
+        for (int i = 0; i < count; i++) {
+          preparePublish.add(((base + i) + " " + message).getBytes(Charsets.UTF_8), 0);
+        }
+        Futures.getUnchecked(preparePublish.publish());
+      }
+    };
+  }
+
+  private long secondsPassed(long startTime, TimeUnit startUnit) {
+    return TimeUnit.SECONDS.convert(System.nanoTime() - TimeUnit.NANOSECONDS.convert(startTime, startUnit),
+                                    TimeUnit.NANOSECONDS);
+  }
+
+  private static File extractKafka() throws IOException, ArchiveException, CompressorException {
+    File kafkaExtract = TMP_FOLDER.newFolder();
+    InputStream kakfaResource = KafkaTest.class.getClassLoader().getResourceAsStream("kafka-0.7.2.tgz");
+    ArchiveInputStream archiveInput = new ArchiveStreamFactory()
+      .createArchiveInputStream(ArchiveStreamFactory.TAR,
+                                new CompressorStreamFactory()
+                                  .createCompressorInputStream(CompressorStreamFactory.GZIP, kakfaResource));
+
+    try {
+      ArchiveEntry entry = archiveInput.getNextEntry();
+      while (entry != null) {
+        File file = new File(kafkaExtract, entry.getName());
+        if (entry.isDirectory()) {
+          file.mkdirs();
+        } else {
+          ByteStreams.copy(archiveInput, Files.newOutputStreamSupplier(file));
+        }
+        entry = archiveInput.getNextEntry();
+      }
+    } finally {
+      archiveInput.close();
+    }
+    return kafkaExtract;
+  }
+
+  private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
+    int port = Networks.getRandomPort();
+    Preconditions.checkState(port > 0, "Failed to get random port.");
+
+    Properties prop = new Properties();
+    prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
+    prop.setProperty("zk.connect", zkConnectStr);
+    prop.setProperty("num.threads", "8");
+    prop.setProperty("port", Integer.toString(port));
+    prop.setProperty("log.flush.interval", "1000");
+    prop.setProperty("max.socket.request.bytes", "104857600");
+    prop.setProperty("log.cleanup.interval.mins", "1");
+    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
+    prop.setProperty("zk.connectiontimeout.ms", "1000000");
+    prop.setProperty("socket.receive.buffer", "1048576");
+    prop.setProperty("enable.zookeeper", "true");
+    prop.setProperty("log.retention.hours", "24");
+    prop.setProperty("brokerid", "0");
+    prop.setProperty("socket.send.buffer", "1048576");
+    prop.setProperty("num.partitions", "1");
+    // Use a really small file size to force some flush to happen
+    prop.setProperty("log.file.size", "1024");
+    prop.setProperty("log.default.flush.interval.ms", "1000");
+    return prop;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-core/src/test/resources/logback-test.xml b/twill-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..3c36660
--- /dev/null
+++ b/twill-core/src/test/resources/logback-test.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.hadoop" level="WARN" />
+    <logger name="org.apache.zookeeper" level="WARN" />
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-api/pom.xml b/twill-discovery-api/pom.xml
new file mode 100644
index 0000000..e41b214
--- /dev/null
+++ b/twill-discovery-api/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-discovery-api</artifactId>
+    <name>Twill discovery service API</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
new file mode 100644
index 0000000..a5529fe
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.discovery;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Discoverable defines the attributes of service to be discovered.
+ */
+public interface Discoverable {
+
+  /**
+   * @return Name of the service
+   */
+  String getName();
+
+  /**
+   * @return An {@link InetSocketAddress} representing the host+port of the service.
+   */
+  InetSocketAddress getSocketAddress();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
new file mode 100644
index 0000000..a26fff8
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * DiscoveryService defines interface for registering {@link Discoverable}.
+ */
+public interface DiscoveryService {
+
+  /**
+   * Registers a {@link Discoverable} service.
+   * @param discoverable Information of the service provider that could be discovered.
+   * @return A {@link Cancellable} for un-registration.
+   */
+  Cancellable register(Discoverable discoverable);
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
new file mode 100644
index 0000000..89cf269
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+/**
+ * Interface for {@link DiscoveryServiceClient} to discover services registered with {@link DiscoveryService}.
+ */
+public interface DiscoveryServiceClient {
+
+  /**
+   * Retrieves a list of {@link Discoverable} for the a service with the given name.
+   *
+   * @param name Name of the service
+   * @return A live {@link Iterable} that on each call to {@link Iterable#iterator()} returns
+   *         an {@link java.util.Iterator Iterator} that reflects the latest set of
+   *         available {@link Discoverable} services.
+   */
+  Iterable<Discoverable> discover(String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/pom.xml b/twill-discovery-core/pom.xml
new file mode 100644
index 0000000..2612138
--- /dev/null
+++ b/twill-discovery-core/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-discovery-core</artifactId>
+    <name>Twill discovery service implementations</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-discovery-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-zookeeper</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
new file mode 100644
index 0000000..5fa97d1
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Wrapper for a discoverable.
+ */
+final class DiscoverableWrapper implements Discoverable {
+  private final String name;
+  private final InetSocketAddress address;
+
+  DiscoverableWrapper(Discoverable discoverable) {
+    this.name = discoverable.getName();
+    this.address = discoverable.getSocketAddress();
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public InetSocketAddress getSocketAddress() {
+    return address;
+  }
+
+  @Override
+  public String toString() {
+    return "{name=" + name + ", address=" + address;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    Discoverable other = (Discoverable) o;
+
+    return name.equals(other.getName()) && address.equals(other.getSocketAddress());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = name.hashCode();
+    result = 31 * result + address.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
new file mode 100644
index 0000000..7a9e984
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+
+import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A simple in memory implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ */
+public class InMemoryDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+
+  private final Multimap<String, Discoverable> services = HashMultimap.create();
+  private final Lock lock = new ReentrantLock();
+
+  @Override
+  public Cancellable register(final Discoverable discoverable) {
+    lock.lock();
+    try {
+      final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+      services.put(wrapper.getName(), wrapper);
+      return new Cancellable() {
+        @Override
+        public void cancel() {
+          lock.lock();
+          try {
+            services.remove(wrapper.getName(), wrapper);
+          } finally {
+            lock.unlock();
+          }
+        }
+      };
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public Iterable<Discoverable> discover(final String name) {
+    return new Iterable<Discoverable>() {
+      @Override
+      public Iterator<Discoverable> iterator() {
+        lock.lock();
+        try {
+          return ImmutableList.copyOf(services.get(name)).iterator();
+        } finally {
+          lock.unlock();
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
new file mode 100644
index 0000000..e2f9bc0
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.base.Charsets;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ * <p>
+ *   Discoverable services are registered within Zookeeper under the namespace 'discoverable' by default.
+ *   If you would like to change the namespace under which the services are registered then you can pass
+ *   in the namespace during construction of {@link ZKDiscoveryService}.
+ * </p>
+ *
+ * <p>
+ *   Following is a simple example of how {@link ZKDiscoveryService} can be used for registering services
+ *   and also for discovering the registered services.
+ *   <blockquote>
+ *    <pre>
+ *      {@code
+ *
+ *      DiscoveryService service = new ZKDiscoveryService(zkClient);
+ *      service.register(new Discoverable() {
+ *        @Override
+ *        public String getName() {
+ *          return 'service-name';
+ *        }
+ *
+ *        @Override
+ *        public InetSocketAddress getSocketAddress() {
+ *          return new InetSocketAddress(hostname, port);
+ *        }
+ *      });
+ *      ...
+ *      ...
+ *      Iterable<Discoverable> services = service.discovery("service-name");
+ *      ...
+ *      }
+ *    </pre>
+ *   </blockquote>
+ * </p>
+ */
+public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
+  private static final String NAMESPACE = "/discoverable";
+
+  private static final long RETRY_MILLIS = 1000;
+
+  // In memory map for recreating ephemeral nodes after session expires.
+  // It map from discoverable to the corresponding Cancellable
+  private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
+  private final Lock lock;
+
+  private final LoadingCache<String, Iterable<Discoverable>> services;
+  private final ZKClient zkClient;
+  private final ScheduledExecutorService retryExecutor;
+
+  /**
+   * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
+   * @param zkClient The {@link ZKClient} for interacting with zookeeper.
+   */
+  public ZKDiscoveryService(ZKClient zkClient) {
+    this(zkClient, NAMESPACE);
+  }
+
+  /**
+   * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry under namepsace.
+   * @param zkClient of zookeeper quorum
+   * @param namespace under which the service registered would be stored in zookeeper.
+   *                  If namespace is {@code null}, no namespace will be used.
+   */
+  public ZKDiscoveryService(ZKClient zkClient, String namespace) {
+    this.discoverables = HashMultimap.create();
+    this.lock = new ReentrantLock();
+    this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
+      Threads.createDaemonThreadFactory("zk-discovery-retry"));
+    this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
+    this.services = CacheBuilder.newBuilder().build(createServiceLoader());
+    this.zkClient.addConnectionWatcher(createConnectionWatcher());
+  }
+
+  /**
+   * Registers a {@link Discoverable} in zookeeper.
+   * <p>
+   *   Registering a {@link Discoverable} will create a node <base>/<service-name>
+   *   in zookeeper as a ephemeral node. If the node already exists (timeout associated with emphemeral, then a runtime
+   *   exception is thrown to make sure that a service with an intent to register is not started without registering.
+   *   When a runtime is thrown, expectation is that the process being started with fail and would be started again
+   *   by the monitoring service.
+   * </p>
+   * @param discoverable Information of the service provider that could be discovered.
+   * @return An instance of {@link Cancellable}
+   */
+  @Override
+  public Cancellable register(final Discoverable discoverable) {
+    final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+    final SettableFuture<String> future = SettableFuture.create();
+    final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper);
+
+    // Create the zk ephemeral node.
+    Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String result) {
+        // Set the sequence node path to cancellable for future cancellation.
+        cancellable.setPath(result);
+        lock.lock();
+        try {
+          discoverables.put(wrapper, cancellable);
+        } finally {
+          lock.unlock();
+        }
+        LOG.debug("Service registered: {} {}", wrapper, result);
+        future.set(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        if (t instanceof KeeperException.NodeExistsException) {
+          handleRegisterFailure(discoverable, future, this, t);
+        } else {
+          LOG.warn("Failed to register: {}", wrapper, t);
+          future.setException(t);
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    Futures.getUnchecked(future);
+    return cancellable;
+  }
+
+  @Override
+  public Iterable<Discoverable> discover(String service) {
+    return services.getUnchecked(service);
+  }
+
+  /**
+   * Handle registration failure.
+   *
+   * @param discoverable The discoverable to register.
+   * @param completion A settable future to set when registration is completed / failed.
+   * @param creationCallback A future callback for path creation.
+   * @param failureCause The original cause of failure.
+   */
+  private void handleRegisterFailure(final Discoverable discoverable,
+                                     final SettableFuture<String> completion,
+                                     final FutureCallback<String> creationCallback,
+                                     final Throwable failureCause) {
+
+    final String path = getNodePath(discoverable);
+    Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
+      @Override
+      public void onSuccess(Stat result) {
+        if (result == null) {
+          // If the node is gone, simply retry.
+          LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
+          retryRegister(discoverable, creationCallback);
+          return;
+        }
+
+        long ephemeralOwner = result.getEphemeralOwner();
+        if (ephemeralOwner == 0) {
+          // it is not an ephemeral node, something wrong.
+          LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.",
+                    path, discoverable);
+          completion.setException(failureCause);
+          return;
+        }
+        Long sessionId = zkClient.getSessionId();
+        if (sessionId == null || ephemeralOwner != sessionId) {
+          // This zkClient is not valid or doesn't own the ephemeral node, simply keep retrying.
+          LOG.info("Owner of {} is different. Retry registration for {}.", path, discoverable);
+          retryRegister(discoverable, creationCallback);
+        } else {
+          // This client owned the node, treat the registration as completed.
+          // This could happen if same client tries to register twice (due to mistake or failure race condition).
+          completion.set(path);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // If exists call failed, simply retry creation.
+        LOG.warn("Error when getting stats on {}. Retry registration for {}.", path, discoverable);
+        retryRegister(discoverable, creationCallback);
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  private OperationFuture<String> doRegister(Discoverable discoverable) {
+    byte[] discoverableBytes = encode(discoverable);
+    return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
+  }
+
+  private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
+    retryExecutor.schedule(new Runnable() {
+
+      @Override
+      public void run() {
+        Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
+      }
+    }, RETRY_MILLIS, TimeUnit.MILLISECONDS);
+  }
+
+
+  /**
+   * Generate unique node path for a given {@link Discoverable}.
+   * @param discoverable An instance of {@link Discoverable}.
+   * @return A node name based on the discoverable.
+   */
+  private String getNodePath(Discoverable discoverable) {
+    InetSocketAddress socketAddress = discoverable.getSocketAddress();
+    String node = Hashing.md5()
+                         .newHasher()
+                         .putBytes(socketAddress.getAddress().getAddress())
+                         .putInt(socketAddress.getPort())
+                         .hash().toString();
+
+    return String.format("/%s/%s", discoverable.getName(), node);
+  }
+
+  private Watcher createConnectionWatcher() {
+    return new Watcher() {
+      // Watcher is invoked from single event thread, hence safe to use normal mutable variable.
+      private boolean expired;
+
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getState() == Event.KeeperState.Expired) {
+          LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
+          expired = true;
+        } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
+          LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
+          expired = false;
+
+          // Re-register all services
+          lock.lock();
+          try {
+            for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
+              LOG.info("Re-registering service: {}", entry.getKey());
+
+              // Must be non-blocking in here.
+              Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
+                @Override
+                public void onSuccess(String result) {
+                  // Updates the cancellable to the newly created sequential node.
+                  entry.getValue().setPath(result);
+                  LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                  // When failed to create the node, there would be no retry and simply make the cancellable do nothing.
+                  entry.getValue().setPath(null);
+                  LOG.error("Failed to re-register service: {}", entry.getKey(), t);
+                }
+              }, Threads.SAME_THREAD_EXECUTOR);
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      }
+    };
+  }
+
+  /**
+   * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
+   */
+  private CacheLoader<String, Iterable<Discoverable>> createServiceLoader() {
+    return new CacheLoader<String, Iterable<Discoverable>>() {
+      @Override
+      public Iterable<Discoverable> load(String service) throws Exception {
+        // The atomic reference is to keep the resulting Iterable live. It always contains a
+        // immutable snapshot of the latest detected set of Discoverable.
+        final AtomicReference<Iterable<Discoverable>> iterable =
+              new AtomicReference<Iterable<Discoverable>>(ImmutableList.<Discoverable>of());
+        final String serviceBase = "/" + service;
+
+        // Watch for children changes in /service
+        ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
+          @Override
+          public void updated(NodeChildren nodeChildren) {
+            // Fetch data of all children nodes in parallel.
+            List<String> children = nodeChildren.getChildren();
+            List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
+            for (String child : children) {
+              dataFutures.add(zkClient.getData(serviceBase + "/" + child));
+            }
+
+            // Update the service map when all fetching are done.
+            final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures);
+            fetchFuture.addListener(new Runnable() {
+              @Override
+              public void run() {
+                ImmutableList.Builder<Discoverable> builder = ImmutableList.builder();
+                for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
+                  // For successful fetch, decode the content.
+                  if (nodeData != null) {
+                    Discoverable discoverable = decode(nodeData.getData());
+                    if (discoverable != null) {
+                      builder.add(discoverable);
+                    }
+                  }
+                }
+                iterable.set(builder.build());
+              }
+            }, Threads.SAME_THREAD_EXECUTOR);
+          }
+        });
+
+        return new Iterable<Discoverable>() {
+          @Override
+          public Iterator<Discoverable> iterator() {
+            return iterable.get().iterator();
+          }
+        };
+      }
+    };
+  }
+
+  /**
+   * Static helper function for decoding array of bytes into a {@link DiscoverableWrapper} object.
+   * @param bytes representing serialized {@link DiscoverableWrapper}
+   * @return null if bytes are null; else an instance of {@link DiscoverableWrapper}
+   */
+  private static Discoverable decode(byte[] bytes) {
+    if (bytes == null) {
+      return null;
+    }
+    String content = new String(bytes, Charsets.UTF_8);
+    return new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec())
+      .create()
+      .fromJson(content, Discoverable.class);
+  }
+
+  /**
+   * Static helper function for encoding an instance of {@link DiscoverableWrapper} into array of bytes.
+   * @param discoverable An instance of {@link Discoverable}
+   * @return array of bytes representing an instance of <code>discoverable</code>
+   */
+  private static byte[] encode(Discoverable discoverable) {
+    return new GsonBuilder().registerTypeAdapter(DiscoverableWrapper.class, new DiscoverableCodec())
+      .create()
+      .toJson(discoverable, DiscoverableWrapper.class)
+      .getBytes(Charsets.UTF_8);
+  }
+
+  /**
+   * Inner class for cancelling (un-register) discovery service.
+   */
+  private final class DiscoveryCancellable implements Cancellable {
+
+    private final Discoverable discoverable;
+    private final AtomicBoolean cancelled;
+    private volatile String path;
+
+    DiscoveryCancellable(Discoverable discoverable) {
+      this.discoverable = discoverable;
+      this.cancelled = new AtomicBoolean();
+    }
+
+    /**
+     * Set the zk node path representing the ephemeral sequence node of this registered discoverable.
+     * Called from ZK event thread when creating of the node completed, either from normal registration or
+     * re-registration due to session expiration.
+     *
+     * @param path The path to ephemeral sequence node.
+     */
+    void setPath(String path) {
+      this.path = path;
+      if (cancelled.get() && path != null) {
+        // Simply delete the path if it's already cancelled
+        // It's for the case when session expire happened and re-registration completed after this has been cancelled.
+        // Not bother with the result as if there is error, nothing much we could do.
+        zkClient.delete(path);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      if (!cancelled.compareAndSet(false, true)) {
+        return;
+      }
+
+      // Take a snapshot of the volatile path.
+      String path = this.path;
+
+      // If it is null, meaning cancel() is called before the ephemeral node is created, hence
+      // setPath() will be called in future (through zk callback when creation is completed)
+      // so that deletion will be done in setPath().
+      if (path == null) {
+        return;
+      }
+
+      // Remove this Cancellable from the map so that upon session expiration won't try to register.
+      lock.lock();
+      try {
+        discoverables.remove(discoverable, this);
+      } finally {
+        lock.unlock();
+      }
+
+      // Delete the path. It's ok if the path not exists
+      // (e.g. what session expired and before node has been re-created)
+      Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
+                                                    KeeperException.NoNodeException.class, path));
+      LOG.debug("Service unregistered: {} {}", discoverable, path);
+    }
+  }
+
+  /**
+   * SerDe for converting a {@link DiscoverableWrapper} into a JSON object
+   * or from a JSON object into {@link DiscoverableWrapper}.
+   */
+  private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
+
+    @Override
+    public Discoverable deserialize(JsonElement json, Type typeOfT,
+                                    JsonDeserializationContext context) throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+      final String service = jsonObj.get("service").getAsString();
+      String hostname = jsonObj.get("hostname").getAsString();
+      int port = jsonObj.get("port").getAsInt();
+      final InetSocketAddress address = new InetSocketAddress(hostname, port);
+      return new Discoverable() {
+        @Override
+        public String getName() {
+          return service;
+        }
+
+        @Override
+        public InetSocketAddress getSocketAddress() {
+          return address;
+        }
+      };
+    }
+
+    @Override
+    public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
+      JsonObject jsonObj = new JsonObject();
+      jsonObj.addProperty("service", src.getName());
+      jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
+      jsonObj.addProperty("port", src.getSocketAddress().getPort());
+      return jsonObj;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
new file mode 100644
index 0000000..a1d6e0c
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes in this package provides service discovery implementations.
+ */
+package org.apache.twill.discovery;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
new file mode 100644
index 0000000..d8cc375
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test memory based service discovery service.
+ */
+public class InMemoryDiscoveryServiceTest {
+  private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+    return service.register(new Discoverable() {
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public InetSocketAddress getSocketAddress() {
+        return new InetSocketAddress(host, port);
+      }
+    });
+  }
+
+  @Test
+  public void simpleDiscoverable() throws Exception {
+    DiscoveryService discoveryService = new InMemoryDiscoveryService();
+    DiscoveryServiceClient discoveryServiceClient = (DiscoveryServiceClient) discoveryService;
+
+    // Register one service running on one host:port
+    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
+
+    // Discover that registered host:port.
+    Assert.assertTrue(Iterables.size(discoverables) == 1);
+
+    // Remove the service
+    cancellable.cancel();
+
+    // There should be no service.
+    discoverables = discoveryServiceClient.discover("foo");
+    TimeUnit.MILLISECONDS.sleep(100);
+    Assert.assertTrue(Iterables.size(discoverables) == 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
new file mode 100644
index 0000000..feee8db
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Services;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.internal.zookeeper.KillZKSession;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Zookeeper based discovery service.
+ */
+public class ZKDiscoveryServiceTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryServiceTest.class);
+
+  private static InMemoryZKServer zkServer;
+  private static ZKClientService zkClient;
+
+  @BeforeClass
+  public static void beforeClass() {
+    zkServer = InMemoryZKServer.builder().setTickTime(100000).build();
+    zkServer.startAndWait();
+
+    zkClient = ZKClientServices.delegate(
+      ZKClients.retryOnFailure(
+        ZKClients.reWatchOnExpire(
+          ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+        RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+    zkClient.startAndWait();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    Futures.getUnchecked(Services.chainStop(zkClient, zkServer));
+  }
+
+  private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+    return service.register(new Discoverable() {
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public InetSocketAddress getSocketAddress() {
+        return new InetSocketAddress(host, port);
+      }
+    });
+  }
+
+
+  private boolean waitTillExpected(int expected, Iterable<Discoverable> discoverables) throws Exception {
+    for (int i = 0; i < 10; ++i) {
+      TimeUnit.MILLISECONDS.sleep(10);
+      if (Iterables.size(discoverables) == expected) {
+        return true;
+      }
+    }
+    return (Iterables.size(discoverables) == expected);
+  }
+
+  @Test (timeout = 5000)
+  public void testDoubleRegister() throws Exception {
+    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = discoveryService;
+
+    // Register on the same host port, it shouldn't fail.
+    Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
+    Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
+
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_double_reg");
+
+    Assert.assertTrue(waitTillExpected(1, discoverables));
+
+    cancellable.cancel();
+    cancellable2.cancel();
+
+    // Register again with two different clients, but killing session of the first one.
+    final ZKClientService zkClient2 = ZKClientServices.delegate(
+      ZKClients.retryOnFailure(
+        ZKClients.reWatchOnExpire(
+          ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+        RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+    zkClient2.startAndWait();
+
+    try {
+      ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
+      cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
+
+      // Schedule a thread to shutdown zkClient2.
+      new Thread() {
+        @Override
+        public void run() {
+          try {
+            TimeUnit.SECONDS.sleep(2);
+            zkClient2.stopAndWait();
+          } catch (InterruptedException e) {
+            LOG.error(e.getMessage(), e);
+          }
+        }
+      }.start();
+
+      // This call would block until zkClient2 is shutdown.
+      cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
+      cancellable.cancel();
+
+    } finally {
+      zkClient2.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testSessionExpires() throws Exception {
+    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = discoveryService;
+
+    Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
+
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_expires");
+
+    // Discover that registered host:port.
+    Assert.assertTrue(waitTillExpected(1, discoverables));
+
+    KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 5000);
+
+    // Register one more endpoint to make sure state has been reflected after reconnection
+    Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
+
+    // Reconnection would trigger re-registration.
+    Assert.assertTrue(waitTillExpected(2, discoverables));
+
+    cancellable.cancel();
+    cancellable2.cancel();
+
+    // Verify that both are now gone.
+    Assert.assertTrue(waitTillExpected(0, discoverables));
+  }
+
+  @Test
+  public void simpleDiscoverable() throws Exception {
+    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+    // Register one service running on one host:port
+    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
+
+    // Discover that registered host:port.
+    Assert.assertTrue(waitTillExpected(1, discoverables));
+
+    // Remove the service
+    cancellable.cancel();
+
+    // There should be no service.
+
+    discoverables = discoveryServiceClient.discover("foo");
+
+    Assert.assertTrue(waitTillExpected(0, discoverables));
+  }
+
+  @Test
+  public void manySameDiscoverable() throws Exception {
+    List<Cancellable> cancellables = Lists.newArrayList();
+    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
+
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("manyDiscoverable");
+    Assert.assertTrue(waitTillExpected(5, discoverables));
+
+    for (int i = 0; i < 5; i++) {
+      cancellables.get(i).cancel();
+      Assert.assertTrue(waitTillExpected(4 - i, discoverables));
+    }
+  }
+
+  @Test
+  public void multiServiceDiscoverable() throws Exception {
+    List<Cancellable> cancellables = Lists.newArrayList();
+    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+    cancellables.add(register(discoveryService, "service1", "localhost", 1));
+    cancellables.add(register(discoveryService, "service1", "localhost", 2));
+    cancellables.add(register(discoveryService, "service1", "localhost", 3));
+    cancellables.add(register(discoveryService, "service1", "localhost", 4));
+    cancellables.add(register(discoveryService, "service1", "localhost", 5));
+
+    cancellables.add(register(discoveryService, "service2", "localhost", 1));
+    cancellables.add(register(discoveryService, "service2", "localhost", 2));
+    cancellables.add(register(discoveryService, "service2", "localhost", 3));
+
+    cancellables.add(register(discoveryService, "service3", "localhost", 1));
+    cancellables.add(register(discoveryService, "service3", "localhost", 2));
+
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("service1");
+    Assert.assertTrue(waitTillExpected(5, discoverables));
+
+    discoverables = discoveryServiceClient.discover("service2");
+    Assert.assertTrue(waitTillExpected(3, discoverables));
+
+    discoverables = discoveryServiceClient.discover("service3");
+    Assert.assertTrue(waitTillExpected(2, discoverables));
+
+    cancellables.add(register(discoveryService, "service3", "localhost", 3));
+    Assert.assertTrue(waitTillExpected(3, discoverables)); // Shows live iterator.
+
+    for (Cancellable cancellable : cancellables) {
+      cancellable.cancel();
+    }
+
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/resources/logback-test.xml b/twill-discovery-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..2615cb4
--- /dev/null
+++ b/twill-discovery-core/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.twill" level="DEBUG" />
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
new file mode 100644
index 0000000..b11bc7a
--- /dev/null
+++ b/twill-yarn/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-yarn</artifactId>
+    <name>Twill Apache Hadoop YARN library</name>
+
+    <properties>
+        <output.dir>target/classes</output.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-discovery-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <outputDirectory>${output.dir}</outputDirectory>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2.0</id>
+            <properties>
+                <output.dir>${hadoop20.output.dir}</output.dir>
+            </properties>
+        </profile>
+        <profile>
+            <id>hadoop-2.1</id>
+            <build>
+                <resources>
+                    <resource>
+                        <directory>${hadoop20.output.dir}</directory>
+                    </resource>
+                    <resource>
+                        <directory>src/main/resources</directory>
+                    </resource>
+                </resources>
+            </build>
+        </profile>
+        <profile>
+            <id>hadoop-2.2</id>
+            <build>
+                <resources>
+                    <resource>
+                        <directory>${hadoop20.output.dir}</directory>
+                    </resource>
+                    <resource>
+                        <directory>src/main/resources</directory>
+                    </resource>
+                </resources>
+            </build>
+        </profile>
+    </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
new file mode 100644
index 0000000..d98dee1
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import org.apache.twill.internal.yarn.ports.AMRMClient;
+import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
+import org.apache.twill.internal.yarn.ports.AllocationResponse;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
+  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+  static {
+    STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+      @Override
+      public YarnContainerStatus apply(ContainerStatus status) {
+        return new Hadoop20YarnContainerStatus(status);
+      }
+    };
+  }
+
+  private final ContainerId containerId;
+  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+  private final AMRMClient amrmClient;
+  private final YarnNMClient nmClient;
+  private InetSocketAddress trackerAddr;
+  private URL trackerUrl;
+  private Resource maxCapability;
+  private Resource minCapability;
+
+  public Hadoop20YarnAMClient(Configuration conf) {
+    String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+    Preconditions.checkArgument(masterContainerId != null,
+                                "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
+    this.containerId = ConverterUtils.toContainerId(masterContainerId);
+    this.containerRequests = ArrayListMultimap.create();
+
+    this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
+    this.amrmClient.init(conf);
+    this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+    Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+    amrmClient.start();
+
+    RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+                                                                                      trackerAddr.getPort(),
+                                                                                      trackerUrl.toString());
+    maxCapability = response.getMaximumResourceCapability();
+    minCapability = response.getMinimumResourceCapability();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+    amrmClient.stop();
+  }
+
+  @Override
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  @Override
+  public String getHost() {
+    return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
+  }
+
+  @Override
+  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+    this.trackerAddr = trackerAddr;
+    this.trackerUrl = trackerUrl;
+  }
+
+  @Override
+  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+    AllocationResponse response = amrmClient.allocate(progress);
+    List<ProcessLauncher<YarnContainerInfo>> launchers
+      = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
+
+    for (Container container : response.getAllocatedContainers()) {
+      launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
+    }
+
+    if (!launchers.isEmpty()) {
+      handler.acquired(launchers);
+
+      // If no process has been launched through the given launcher, return the container.
+      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+        // This cast always works.
+        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+        if (!launcher.isLaunched()) {
+          Container container = launcher.getContainerInfo().getContainer();
+          LOG.info("Nothing to run in container, releasing it: {}", container);
+          amrmClient.releaseAssignedContainer(container.getId());
+        }
+      }
+    }
+
+    List<YarnContainerStatus> completed = ImmutableList.copyOf(
+      Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+    if (!completed.isEmpty()) {
+      handler.completed(completed);
+    }
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability) {
+    return addContainerRequest(capability, 1);
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+    return new ContainerRequestBuilder(adjustCapability(capability), count) {
+      @Override
+      public String apply() {
+        synchronized (Hadoop20YarnAMClient.this) {
+          String id = UUID.randomUUID().toString();
+
+          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+          for (int i = 0; i < count; i++) {
+            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
+                                                                                  priority, 1);
+            containerRequests.put(id, request);
+            amrmClient.addContainerRequest(request);
+          }
+
+          return id;
+        }
+      }
+    };
+  }
+
+  @Override
+  public synchronized void completeContainerRequest(String id) {
+    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+      amrmClient.removeContainerRequest(request);
+    }
+  }
+
+  private Resource adjustCapability(Resource resource) {
+    int cores = YarnUtils.getVirtualCores(resource);
+    int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
+                                YarnUtils.getVirtualCores(minCapability));
+    // Try and set the virtual cores, which older versions of YARN don't support this.
+    if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
+      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+    }
+
+    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+    int minMemory = minCapability.getMemory();
+    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+    if (resource.getMemory() != updatedMemory) {
+      resource.setMemory(updatedMemory);
+      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+    }
+
+    return resource;
+  }
+}