You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:57 UTC

[15/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-core/src/test/resources/logback-test.xml b/twill-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..3c36660
--- /dev/null
+++ b/twill-core/src/test/resources/logback-test.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.hadoop" level="WARN" />
+    <logger name="org.apache.zookeeper" level="WARN" />
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-api/pom.xml b/twill-discovery-api/pom.xml
new file mode 100644
index 0000000..e41b214
--- /dev/null
+++ b/twill-discovery-api/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-discovery-api</artifactId>
+    <name>Twill discovery service API</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
new file mode 100644
index 0000000..a5529fe
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.discovery;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Discoverable defines the attributes of service to be discovered.
+ */
+public interface Discoverable {
+
+  /**
+   * @return Name of the service
+   */
+  String getName();
+
+  /**
+   * @return An {@link InetSocketAddress} representing the host+port of the service.
+   */
+  InetSocketAddress getSocketAddress();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
new file mode 100644
index 0000000..a26fff8
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * DiscoveryService defines interface for registering {@link Discoverable}.
+ */
+public interface DiscoveryService {
+
+  /**
+   * Registers a {@link Discoverable} service.
+   * @param discoverable Information of the service provider that could be discovered.
+   * @return A {@link Cancellable} for un-registration.
+   */
+  Cancellable register(Discoverable discoverable);
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
new file mode 100644
index 0000000..89cf269
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+/**
+ * Interface for {@link DiscoveryServiceClient} to discover services registered with {@link DiscoveryService}.
+ */
+public interface DiscoveryServiceClient {
+
+  /**
+   * Retrieves a list of {@link Discoverable} for the a service with the given name.
+   *
+   * @param name Name of the service
+   * @return A live {@link Iterable} that on each call to {@link Iterable#iterator()} returns
+   *         an {@link java.util.Iterator Iterator} that reflects the latest set of
+   *         available {@link Discoverable} services.
+   */
+  Iterable<Discoverable> discover(String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/pom.xml b/twill-discovery-core/pom.xml
new file mode 100644
index 0000000..2612138
--- /dev/null
+++ b/twill-discovery-core/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-discovery-core</artifactId>
+    <name>Twill discovery service implementations</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-discovery-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-zookeeper</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
new file mode 100644
index 0000000..5fa97d1
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Wrapper for a discoverable.
+ */
+final class DiscoverableWrapper implements Discoverable {
+  private final String name;
+  private final InetSocketAddress address;
+
+  DiscoverableWrapper(Discoverable discoverable) {
+    this.name = discoverable.getName();
+    this.address = discoverable.getSocketAddress();
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public InetSocketAddress getSocketAddress() {
+    return address;
+  }
+
+  @Override
+  public String toString() {
+    return "{name=" + name + ", address=" + address;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    Discoverable other = (Discoverable) o;
+
+    return name.equals(other.getName()) && address.equals(other.getSocketAddress());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = name.hashCode();
+    result = 31 * result + address.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
new file mode 100644
index 0000000..7a9e984
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+
+import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A simple in memory implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ */
+public class InMemoryDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+
+  private final Multimap<String, Discoverable> services = HashMultimap.create();
+  private final Lock lock = new ReentrantLock();
+
+  @Override
+  public Cancellable register(final Discoverable discoverable) {
+    lock.lock();
+    try {
+      final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+      services.put(wrapper.getName(), wrapper);
+      return new Cancellable() {
+        @Override
+        public void cancel() {
+          lock.lock();
+          try {
+            services.remove(wrapper.getName(), wrapper);
+          } finally {
+            lock.unlock();
+          }
+        }
+      };
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public Iterable<Discoverable> discover(final String name) {
+    return new Iterable<Discoverable>() {
+      @Override
+      public Iterator<Discoverable> iterator() {
+        lock.lock();
+        try {
+          return ImmutableList.copyOf(services.get(name)).iterator();
+        } finally {
+          lock.unlock();
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
new file mode 100644
index 0000000..e2f9bc0
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.base.Charsets;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ * <p>
+ *   Discoverable services are registered within Zookeeper under the namespace 'discoverable' by default.
+ *   If you would like to change the namespace under which the services are registered then you can pass
+ *   in the namespace during construction of {@link ZKDiscoveryService}.
+ * </p>
+ *
+ * <p>
+ *   Following is a simple example of how {@link ZKDiscoveryService} can be used for registering services
+ *   and also for discovering the registered services.
+ *   <blockquote>
+ *    <pre>
+ *      {@code
+ *
+ *      DiscoveryService service = new ZKDiscoveryService(zkClient);
+ *      service.register(new Discoverable() {
+ *        @Override
+ *        public String getName() {
+ *          return 'service-name';
+ *        }
+ *
+ *        @Override
+ *        public InetSocketAddress getSocketAddress() {
+ *          return new InetSocketAddress(hostname, port);
+ *        }
+ *      });
+ *      ...
+ *      ...
+ *      Iterable<Discoverable> services = service.discovery("service-name");
+ *      ...
+ *      }
+ *    </pre>
+ *   </blockquote>
+ * </p>
+ */
+public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
+  private static final String NAMESPACE = "/discoverable";
+
+  private static final long RETRY_MILLIS = 1000;
+
+  // In memory map for recreating ephemeral nodes after session expires.
+  // It map from discoverable to the corresponding Cancellable
+  private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
+  private final Lock lock;
+
+  private final LoadingCache<String, Iterable<Discoverable>> services;
+  private final ZKClient zkClient;
+  private final ScheduledExecutorService retryExecutor;
+
+  /**
+   * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
+   * @param zkClient The {@link ZKClient} for interacting with zookeeper.
+   */
+  public ZKDiscoveryService(ZKClient zkClient) {
+    this(zkClient, NAMESPACE);
+  }
+
+  /**
+   * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry under namepsace.
+   * @param zkClient of zookeeper quorum
+   * @param namespace under which the service registered would be stored in zookeeper.
+   *                  If namespace is {@code null}, no namespace will be used.
+   */
+  public ZKDiscoveryService(ZKClient zkClient, String namespace) {
+    this.discoverables = HashMultimap.create();
+    this.lock = new ReentrantLock();
+    this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
+      Threads.createDaemonThreadFactory("zk-discovery-retry"));
+    this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
+    this.services = CacheBuilder.newBuilder().build(createServiceLoader());
+    this.zkClient.addConnectionWatcher(createConnectionWatcher());
+  }
+
+  /**
+   * Registers a {@link Discoverable} in zookeeper.
+   * <p>
+   *   Registering a {@link Discoverable} will create a node <base>/<service-name>
+   *   in zookeeper as a ephemeral node. If the node already exists (timeout associated with emphemeral, then a runtime
+   *   exception is thrown to make sure that a service with an intent to register is not started without registering.
+   *   When a runtime is thrown, expectation is that the process being started with fail and would be started again
+   *   by the monitoring service.
+   * </p>
+   * @param discoverable Information of the service provider that could be discovered.
+   * @return An instance of {@link Cancellable}
+   */
+  @Override
+  public Cancellable register(final Discoverable discoverable) {
+    final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+    final SettableFuture<String> future = SettableFuture.create();
+    final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper);
+
+    // Create the zk ephemeral node.
+    Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String result) {
+        // Set the sequence node path to cancellable for future cancellation.
+        cancellable.setPath(result);
+        lock.lock();
+        try {
+          discoverables.put(wrapper, cancellable);
+        } finally {
+          lock.unlock();
+        }
+        LOG.debug("Service registered: {} {}", wrapper, result);
+        future.set(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        if (t instanceof KeeperException.NodeExistsException) {
+          handleRegisterFailure(discoverable, future, this, t);
+        } else {
+          LOG.warn("Failed to register: {}", wrapper, t);
+          future.setException(t);
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    Futures.getUnchecked(future);
+    return cancellable;
+  }
+
+  @Override
+  public Iterable<Discoverable> discover(String service) {
+    return services.getUnchecked(service);
+  }
+
+  /**
+   * Handle registration failure.
+   *
+   * @param discoverable The discoverable to register.
+   * @param completion A settable future to set when registration is completed / failed.
+   * @param creationCallback A future callback for path creation.
+   * @param failureCause The original cause of failure.
+   */
+  private void handleRegisterFailure(final Discoverable discoverable,
+                                     final SettableFuture<String> completion,
+                                     final FutureCallback<String> creationCallback,
+                                     final Throwable failureCause) {
+
+    final String path = getNodePath(discoverable);
+    Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
+      @Override
+      public void onSuccess(Stat result) {
+        if (result == null) {
+          // If the node is gone, simply retry.
+          LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
+          retryRegister(discoverable, creationCallback);
+          return;
+        }
+
+        long ephemeralOwner = result.getEphemeralOwner();
+        if (ephemeralOwner == 0) {
+          // it is not an ephemeral node, something wrong.
+          LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.",
+                    path, discoverable);
+          completion.setException(failureCause);
+          return;
+        }
+        Long sessionId = zkClient.getSessionId();
+        if (sessionId == null || ephemeralOwner != sessionId) {
+          // This zkClient is not valid or doesn't own the ephemeral node, simply keep retrying.
+          LOG.info("Owner of {} is different. Retry registration for {}.", path, discoverable);
+          retryRegister(discoverable, creationCallback);
+        } else {
+          // This client owned the node, treat the registration as completed.
+          // This could happen if same client tries to register twice (due to mistake or failure race condition).
+          completion.set(path);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // If exists call failed, simply retry creation.
+        LOG.warn("Error when getting stats on {}. Retry registration for {}.", path, discoverable);
+        retryRegister(discoverable, creationCallback);
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  private OperationFuture<String> doRegister(Discoverable discoverable) {
+    byte[] discoverableBytes = encode(discoverable);
+    return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
+  }
+
+  private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
+    retryExecutor.schedule(new Runnable() {
+
+      @Override
+      public void run() {
+        Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
+      }
+    }, RETRY_MILLIS, TimeUnit.MILLISECONDS);
+  }
+
+
+  /**
+   * Generate unique node path for a given {@link Discoverable}.
+   * @param discoverable An instance of {@link Discoverable}.
+   * @return A node name based on the discoverable.
+   */
+  private String getNodePath(Discoverable discoverable) {
+    InetSocketAddress socketAddress = discoverable.getSocketAddress();
+    String node = Hashing.md5()
+                         .newHasher()
+                         .putBytes(socketAddress.getAddress().getAddress())
+                         .putInt(socketAddress.getPort())
+                         .hash().toString();
+
+    return String.format("/%s/%s", discoverable.getName(), node);
+  }
+
+  private Watcher createConnectionWatcher() {
+    return new Watcher() {
+      // Watcher is invoked from single event thread, hence safe to use normal mutable variable.
+      private boolean expired;
+
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getState() == Event.KeeperState.Expired) {
+          LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
+          expired = true;
+        } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
+          LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
+          expired = false;
+
+          // Re-register all services
+          lock.lock();
+          try {
+            for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
+              LOG.info("Re-registering service: {}", entry.getKey());
+
+              // Must be non-blocking in here.
+              Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
+                @Override
+                public void onSuccess(String result) {
+                  // Updates the cancellable to the newly created sequential node.
+                  entry.getValue().setPath(result);
+                  LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                  // When failed to create the node, there would be no retry and simply make the cancellable do nothing.
+                  entry.getValue().setPath(null);
+                  LOG.error("Failed to re-register service: {}", entry.getKey(), t);
+                }
+              }, Threads.SAME_THREAD_EXECUTOR);
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      }
+    };
+  }
+
+  /**
+   * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
+   */
+  private CacheLoader<String, Iterable<Discoverable>> createServiceLoader() {
+    return new CacheLoader<String, Iterable<Discoverable>>() {
+      @Override
+      public Iterable<Discoverable> load(String service) throws Exception {
+        // The atomic reference is to keep the resulting Iterable live. It always contains a
+        // immutable snapshot of the latest detected set of Discoverable.
+        final AtomicReference<Iterable<Discoverable>> iterable =
+              new AtomicReference<Iterable<Discoverable>>(ImmutableList.<Discoverable>of());
+        final String serviceBase = "/" + service;
+
+        // Watch for children changes in /service
+        ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
+          @Override
+          public void updated(NodeChildren nodeChildren) {
+            // Fetch data of all children nodes in parallel.
+            List<String> children = nodeChildren.getChildren();
+            List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
+            for (String child : children) {
+              dataFutures.add(zkClient.getData(serviceBase + "/" + child));
+            }
+
+            // Update the service map when all fetching are done.
+            final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures);
+            fetchFuture.addListener(new Runnable() {
+              @Override
+              public void run() {
+                ImmutableList.Builder<Discoverable> builder = ImmutableList.builder();
+                for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
+                  // For successful fetch, decode the content.
+                  if (nodeData != null) {
+                    Discoverable discoverable = decode(nodeData.getData());
+                    if (discoverable != null) {
+                      builder.add(discoverable);
+                    }
+                  }
+                }
+                iterable.set(builder.build());
+              }
+            }, Threads.SAME_THREAD_EXECUTOR);
+          }
+        });
+
+        return new Iterable<Discoverable>() {
+          @Override
+          public Iterator<Discoverable> iterator() {
+            return iterable.get().iterator();
+          }
+        };
+      }
+    };
+  }
+
+  /**
+   * Static helper function for decoding array of bytes into a {@link DiscoverableWrapper} object.
+   * @param bytes representing serialized {@link DiscoverableWrapper}
+   * @return null if bytes are null; else an instance of {@link DiscoverableWrapper}
+   */
+  private static Discoverable decode(byte[] bytes) {
+    if (bytes == null) {
+      return null;
+    }
+    String content = new String(bytes, Charsets.UTF_8);
+    return new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec())
+      .create()
+      .fromJson(content, Discoverable.class);
+  }
+
+  /**
+   * Static helper function for encoding an instance of {@link DiscoverableWrapper} into array of bytes.
+   * @param discoverable An instance of {@link Discoverable}
+   * @return array of bytes representing an instance of <code>discoverable</code>
+   */
+  private static byte[] encode(Discoverable discoverable) {
+    return new GsonBuilder().registerTypeAdapter(DiscoverableWrapper.class, new DiscoverableCodec())
+      .create()
+      .toJson(discoverable, DiscoverableWrapper.class)
+      .getBytes(Charsets.UTF_8);
+  }
+
+  /**
+   * Inner class for cancelling (un-register) discovery service.
+   */
+  private final class DiscoveryCancellable implements Cancellable {
+
+    private final Discoverable discoverable;
+    private final AtomicBoolean cancelled;
+    private volatile String path;
+
+    DiscoveryCancellable(Discoverable discoverable) {
+      this.discoverable = discoverable;
+      this.cancelled = new AtomicBoolean();
+    }
+
+    /**
+     * Set the zk node path representing the ephemeral sequence node of this registered discoverable.
+     * Called from ZK event thread when creating of the node completed, either from normal registration or
+     * re-registration due to session expiration.
+     *
+     * @param path The path to ephemeral sequence node.
+     */
+    void setPath(String path) {
+      this.path = path;
+      if (cancelled.get() && path != null) {
+        // Simply delete the path if it's already cancelled
+        // It's for the case when session expire happened and re-registration completed after this has been cancelled.
+        // Not bother with the result as if there is error, nothing much we could do.
+        zkClient.delete(path);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      if (!cancelled.compareAndSet(false, true)) {
+        return;
+      }
+
+      // Take a snapshot of the volatile path.
+      String path = this.path;
+
+      // If it is null, meaning cancel() is called before the ephemeral node is created, hence
+      // setPath() will be called in future (through zk callback when creation is completed)
+      // so that deletion will be done in setPath().
+      if (path == null) {
+        return;
+      }
+
+      // Remove this Cancellable from the map so that upon session expiration won't try to register.
+      lock.lock();
+      try {
+        discoverables.remove(discoverable, this);
+      } finally {
+        lock.unlock();
+      }
+
+      // Delete the path. It's ok if the path not exists
+      // (e.g. what session expired and before node has been re-created)
+      Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
+                                                    KeeperException.NoNodeException.class, path));
+      LOG.debug("Service unregistered: {} {}", discoverable, path);
+    }
+  }
+
+  /**
+   * SerDe for converting a {@link DiscoverableWrapper} into a JSON object
+   * or from a JSON object into {@link DiscoverableWrapper}.
+   */
+  private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
+
+    @Override
+    public Discoverable deserialize(JsonElement json, Type typeOfT,
+                                    JsonDeserializationContext context) throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+      final String service = jsonObj.get("service").getAsString();
+      String hostname = jsonObj.get("hostname").getAsString();
+      int port = jsonObj.get("port").getAsInt();
+      final InetSocketAddress address = new InetSocketAddress(hostname, port);
+      return new Discoverable() {
+        @Override
+        public String getName() {
+          return service;
+        }
+
+        @Override
+        public InetSocketAddress getSocketAddress() {
+          return address;
+        }
+      };
+    }
+
+    @Override
+    public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
+      JsonObject jsonObj = new JsonObject();
+      jsonObj.addProperty("service", src.getName());
+      jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
+      jsonObj.addProperty("port", src.getSocketAddress().getPort());
+      return jsonObj;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
new file mode 100644
index 0000000..a1d6e0c
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes in this package provides service discovery implementations.
+ */
+package org.apache.twill.discovery;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
new file mode 100644
index 0000000..d8cc375
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test memory based service discovery service.
+ */
+public class InMemoryDiscoveryServiceTest {
+  private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+    return service.register(new Discoverable() {
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public InetSocketAddress getSocketAddress() {
+        return new InetSocketAddress(host, port);
+      }
+    });
+  }
+
+  @Test
+  public void simpleDiscoverable() throws Exception {
+    DiscoveryService discoveryService = new InMemoryDiscoveryService();
+    DiscoveryServiceClient discoveryServiceClient = (DiscoveryServiceClient) discoveryService;
+
+    // Register one service running on one host:port
+    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
+
+    // Discover that registered host:port.
+    Assert.assertTrue(Iterables.size(discoverables) == 1);
+
+    // Remove the service
+    cancellable.cancel();
+
+    // There should be no service.
+    discoverables = discoveryServiceClient.discover("foo");
+    TimeUnit.MILLISECONDS.sleep(100);
+    Assert.assertTrue(Iterables.size(discoverables) == 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
new file mode 100644
index 0000000..feee8db
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Services;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.internal.zookeeper.KillZKSession;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Zookeeper based discovery service.
+ */
+public class ZKDiscoveryServiceTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryServiceTest.class);
+
+  private static InMemoryZKServer zkServer;
+  private static ZKClientService zkClient;
+
+  @BeforeClass
+  public static void beforeClass() {
+    zkServer = InMemoryZKServer.builder().setTickTime(100000).build();
+    zkServer.startAndWait();
+
+    zkClient = ZKClientServices.delegate(
+      ZKClients.retryOnFailure(
+        ZKClients.reWatchOnExpire(
+          ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+        RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+    zkClient.startAndWait();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    Futures.getUnchecked(Services.chainStop(zkClient, zkServer));
+  }
+
+  private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+    return service.register(new Discoverable() {
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public InetSocketAddress getSocketAddress() {
+        return new InetSocketAddress(host, port);
+      }
+    });
+  }
+
+
+  private boolean waitTillExpected(int expected, Iterable<Discoverable> discoverables) throws Exception {
+    for (int i = 0; i < 10; ++i) {
+      TimeUnit.MILLISECONDS.sleep(10);
+      if (Iterables.size(discoverables) == expected) {
+        return true;
+      }
+    }
+    return (Iterables.size(discoverables) == expected);
+  }
+
+  @Test (timeout = 5000)
+  public void testDoubleRegister() throws Exception {
+    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = discoveryService;
+
+    // Register on the same host port, it shouldn't fail.
+    Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
+    Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
+
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_double_reg");
+
+    Assert.assertTrue(waitTillExpected(1, discoverables));
+
+    cancellable.cancel();
+    cancellable2.cancel();
+
+    // Register again with two different clients, but killing session of the first one.
+    final ZKClientService zkClient2 = ZKClientServices.delegate(
+      ZKClients.retryOnFailure(
+        ZKClients.reWatchOnExpire(
+          ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+        RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+    zkClient2.startAndWait();
+
+    try {
+      ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
+      cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
+
+      // Schedule a thread to shutdown zkClient2.
+      new Thread() {
+        @Override
+        public void run() {
+          try {
+            TimeUnit.SECONDS.sleep(2);
+            zkClient2.stopAndWait();
+          } catch (InterruptedException e) {
+            LOG.error(e.getMessage(), e);
+          }
+        }
+      }.start();
+
+      // This call would block until zkClient2 is shutdown.
+      cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
+      cancellable.cancel();
+
+    } finally {
+      zkClient2.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testSessionExpires() throws Exception {
+    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = discoveryService;
+
+    Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
+
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_expires");
+
+    // Discover that registered host:port.
+    Assert.assertTrue(waitTillExpected(1, discoverables));
+
+    KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 5000);
+
+    // Register one more endpoint to make sure state has been reflected after reconnection
+    Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
+
+    // Reconnection would trigger re-registration.
+    Assert.assertTrue(waitTillExpected(2, discoverables));
+
+    cancellable.cancel();
+    cancellable2.cancel();
+
+    // Verify that both are now gone.
+    Assert.assertTrue(waitTillExpected(0, discoverables));
+  }
+
+  @Test
+  public void simpleDiscoverable() throws Exception {
+    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+    // Register one service running on one host:port
+    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
+
+    // Discover that registered host:port.
+    Assert.assertTrue(waitTillExpected(1, discoverables));
+
+    // Remove the service
+    cancellable.cancel();
+
+    // There should be no service.
+
+    discoverables = discoveryServiceClient.discover("foo");
+
+    Assert.assertTrue(waitTillExpected(0, discoverables));
+  }
+
+  @Test
+  public void manySameDiscoverable() throws Exception {
+    List<Cancellable> cancellables = Lists.newArrayList();
+    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
+
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("manyDiscoverable");
+    Assert.assertTrue(waitTillExpected(5, discoverables));
+
+    for (int i = 0; i < 5; i++) {
+      cancellables.get(i).cancel();
+      Assert.assertTrue(waitTillExpected(4 - i, discoverables));
+    }
+  }
+
+  @Test
+  public void multiServiceDiscoverable() throws Exception {
+    List<Cancellable> cancellables = Lists.newArrayList();
+    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+    cancellables.add(register(discoveryService, "service1", "localhost", 1));
+    cancellables.add(register(discoveryService, "service1", "localhost", 2));
+    cancellables.add(register(discoveryService, "service1", "localhost", 3));
+    cancellables.add(register(discoveryService, "service1", "localhost", 4));
+    cancellables.add(register(discoveryService, "service1", "localhost", 5));
+
+    cancellables.add(register(discoveryService, "service2", "localhost", 1));
+    cancellables.add(register(discoveryService, "service2", "localhost", 2));
+    cancellables.add(register(discoveryService, "service2", "localhost", 3));
+
+    cancellables.add(register(discoveryService, "service3", "localhost", 1));
+    cancellables.add(register(discoveryService, "service3", "localhost", 2));
+
+    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("service1");
+    Assert.assertTrue(waitTillExpected(5, discoverables));
+
+    discoverables = discoveryServiceClient.discover("service2");
+    Assert.assertTrue(waitTillExpected(3, discoverables));
+
+    discoverables = discoveryServiceClient.discover("service3");
+    Assert.assertTrue(waitTillExpected(2, discoverables));
+
+    cancellables.add(register(discoveryService, "service3", "localhost", 3));
+    Assert.assertTrue(waitTillExpected(3, discoverables)); // Shows live iterator.
+
+    for (Cancellable cancellable : cancellables) {
+      cancellable.cancel();
+    }
+
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-discovery-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/resources/logback-test.xml b/twill-discovery-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..2615cb4
--- /dev/null
+++ b/twill-discovery-core/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.twill" level="DEBUG" />
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
new file mode 100644
index 0000000..b11bc7a
--- /dev/null
+++ b/twill-yarn/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-yarn</artifactId>
+    <name>Twill Apache Hadoop YARN library</name>
+
+    <properties>
+        <output.dir>target/classes</output.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-discovery-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <outputDirectory>${output.dir}</outputDirectory>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2.0</id>
+            <properties>
+                <output.dir>${hadoop20.output.dir}</output.dir>
+            </properties>
+        </profile>
+        <profile>
+            <id>hadoop-2.1</id>
+            <build>
+                <resources>
+                    <resource>
+                        <directory>${hadoop20.output.dir}</directory>
+                    </resource>
+                    <resource>
+                        <directory>src/main/resources</directory>
+                    </resource>
+                </resources>
+            </build>
+        </profile>
+        <profile>
+            <id>hadoop-2.2</id>
+            <build>
+                <resources>
+                    <resource>
+                        <directory>${hadoop20.output.dir}</directory>
+                    </resource>
+                    <resource>
+                        <directory>src/main/resources</directory>
+                    </resource>
+                </resources>
+            </build>
+        </profile>
+    </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
new file mode 100644
index 0000000..d98dee1
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import org.apache.twill.internal.yarn.ports.AMRMClient;
+import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
+import org.apache.twill.internal.yarn.ports.AllocationResponse;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
+  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+  static {
+    STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+      @Override
+      public YarnContainerStatus apply(ContainerStatus status) {
+        return new Hadoop20YarnContainerStatus(status);
+      }
+    };
+  }
+
+  private final ContainerId containerId;
+  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+  private final AMRMClient amrmClient;
+  private final YarnNMClient nmClient;
+  private InetSocketAddress trackerAddr;
+  private URL trackerUrl;
+  private Resource maxCapability;
+  private Resource minCapability;
+
+  public Hadoop20YarnAMClient(Configuration conf) {
+    String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+    Preconditions.checkArgument(masterContainerId != null,
+                                "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
+    this.containerId = ConverterUtils.toContainerId(masterContainerId);
+    this.containerRequests = ArrayListMultimap.create();
+
+    this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
+    this.amrmClient.init(conf);
+    this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+    Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+    amrmClient.start();
+
+    RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+                                                                                      trackerAddr.getPort(),
+                                                                                      trackerUrl.toString());
+    maxCapability = response.getMaximumResourceCapability();
+    minCapability = response.getMinimumResourceCapability();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+    amrmClient.stop();
+  }
+
+  @Override
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  @Override
+  public String getHost() {
+    return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
+  }
+
+  @Override
+  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+    this.trackerAddr = trackerAddr;
+    this.trackerUrl = trackerUrl;
+  }
+
+  @Override
+  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+    AllocationResponse response = amrmClient.allocate(progress);
+    List<ProcessLauncher<YarnContainerInfo>> launchers
+      = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
+
+    for (Container container : response.getAllocatedContainers()) {
+      launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
+    }
+
+    if (!launchers.isEmpty()) {
+      handler.acquired(launchers);
+
+      // If no process has been launched through the given launcher, return the container.
+      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+        // This cast always works.
+        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+        if (!launcher.isLaunched()) {
+          Container container = launcher.getContainerInfo().getContainer();
+          LOG.info("Nothing to run in container, releasing it: {}", container);
+          amrmClient.releaseAssignedContainer(container.getId());
+        }
+      }
+    }
+
+    List<YarnContainerStatus> completed = ImmutableList.copyOf(
+      Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+    if (!completed.isEmpty()) {
+      handler.completed(completed);
+    }
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability) {
+    return addContainerRequest(capability, 1);
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+    return new ContainerRequestBuilder(adjustCapability(capability), count) {
+      @Override
+      public String apply() {
+        synchronized (Hadoop20YarnAMClient.this) {
+          String id = UUID.randomUUID().toString();
+
+          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+          for (int i = 0; i < count; i++) {
+            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
+                                                                                  priority, 1);
+            containerRequests.put(id, request);
+            amrmClient.addContainerRequest(request);
+          }
+
+          return id;
+        }
+      }
+    };
+  }
+
+  @Override
+  public synchronized void completeContainerRequest(String id) {
+    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+      amrmClient.removeContainerRequest(request);
+    }
+  }
+
+  private Resource adjustCapability(Resource resource) {
+    int cores = YarnUtils.getVirtualCores(resource);
+    int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
+                                YarnUtils.getVirtualCores(minCapability));
+    // Try and set the virtual cores, which older versions of YARN don't support this.
+    if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
+      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+    }
+
+    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+    int minMemory = minCapability.getMemory();
+    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+    if (resource.getMemory() != updatedMemory) {
+      resource.setMemory(updatedMemory);
+      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+    }
+
+    return resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
new file mode 100644
index 0000000..bfec34e
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationSubmitter;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAppClient extends AbstractIdleService implements YarnAppClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAppClient.class);
+  private final YarnClient yarnClient;
+  private String user;
+
+  public Hadoop20YarnAppClient(Configuration configuration) {
+    this.yarnClient = new YarnClientImpl();
+    yarnClient.init(configuration);
+    this.user = System.getProperty("user.name");
+  }
+
+  @Override
+  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
+    // Request for new application
+    final GetNewApplicationResponse response = yarnClient.getNewApplication();
+    final ApplicationId appId = response.getApplicationId();
+
+    // Setup the context for application submission
+    final ApplicationSubmissionContext appSubmissionContext = Records.newRecord(ApplicationSubmissionContext.class);
+    appSubmissionContext.setApplicationId(appId);
+    appSubmissionContext.setApplicationName(twillSpec.getName());
+    appSubmissionContext.setUser(user);
+
+    ApplicationSubmitter submitter = new ApplicationSubmitter() {
+
+      @Override
+      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) {
+        ContainerLaunchContext context = launchContext.getLaunchContext();
+        addRMToken(context);
+        context.setUser(appSubmissionContext.getUser());
+        context.setResource(adjustMemory(response, capability));
+        appSubmissionContext.setAMContainerSpec(context);
+
+        try {
+          yarnClient.submitApplication(appSubmissionContext);
+          return new ProcessControllerImpl(yarnClient, appId);
+        } catch (YarnRemoteException e) {
+          LOG.error("Failed to submit application {}", appId, e);
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+
+    return new ApplicationMasterProcessLauncher(appId, submitter);
+  }
+
+  private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
+    int minMemory = response.getMinimumResourceCapability().getMemory();
+
+    int updatedMemory = Math.min(capability.getMemory(), response.getMaximumResourceCapability().getMemory());
+    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+    if (updatedMemory != capability.getMemory()) {
+      capability.setMemory(updatedMemory);
+    }
+
+    return capability;
+  }
+
+  private void addRMToken(ContainerLaunchContext context) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+
+    try {
+      Credentials credentials = YarnUtils.decodeCredentials(context.getContainerTokens());
+
+      Configuration config = yarnClient.getConfig();
+      Token<TokenIdentifier> token = convertToken(
+        yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
+        YarnUtils.getRMAddress(config));
+
+      LOG.info("Added RM delegation token {}", token);
+      credentials.addToken(token.getService(), token);
+
+      context.setContainerTokens(YarnUtils.encodeCredentials(credentials));
+
+    } catch (Exception e) {
+      LOG.error("Fails to create credentials.", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private <T extends TokenIdentifier> Token<T> convertToken(DelegationToken protoToken, InetSocketAddress serviceAddr) {
+    Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
+                                  protoToken.getPassword().array(),
+                                  new Text(protoToken.getKind()),
+                                  new Text(protoToken.getService()));
+    if (serviceAddr != null) {
+      SecurityUtil.setTokenService(token, serviceAddr);
+    }
+    return token;
+  }
+
+  @Override
+  public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
+    this.user = user;
+    return createLauncher(twillSpec);
+  }
+
+  @Override
+  public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
+    return new ProcessControllerImpl(yarnClient, appId);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    yarnClient.start();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    yarnClient.stop();
+  }
+
+  private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
+    private final YarnClient yarnClient;
+    private final ApplicationId appId;
+
+    public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
+      this.yarnClient = yarnClient;
+      this.appId = appId;
+    }
+
+    @Override
+    public YarnApplicationReport getReport() {
+      try {
+        return new Hadoop20YarnApplicationReport(yarnClient.getApplicationReport(appId));
+      } catch (YarnRemoteException e) {
+        LOG.error("Failed to get application report {}", appId, e);
+        throw Throwables.propagate(e);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      try {
+        yarnClient.killApplication(appId);
+      } catch (YarnRemoteException e) {
+        LOG.error("Failed to kill application {}", appId, e);
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
new file mode 100644
index 0000000..6c1b764
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ *
+ */
+public final class Hadoop20YarnApplicationReport implements YarnApplicationReport {
+
+  private final ApplicationReport report;
+
+  public Hadoop20YarnApplicationReport(ApplicationReport report) {
+    this.report = report;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return report.getApplicationId();
+  }
+
+  @Override
+  public ApplicationAttemptId getCurrentApplicationAttemptId() {
+    return report.getCurrentApplicationAttemptId();
+  }
+
+  @Override
+  public String getQueue() {
+    return report.getQueue();
+  }
+
+  @Override
+  public String getName() {
+    return report.getName();
+  }
+
+  @Override
+  public String getHost() {
+    return report.getHost();
+  }
+
+  @Override
+  public int getRpcPort() {
+    return report.getRpcPort();
+  }
+
+  @Override
+  public YarnApplicationState getYarnApplicationState() {
+    return report.getYarnApplicationState();
+  }
+
+  @Override
+  public String getDiagnostics() {
+    return report.getDiagnostics();
+  }
+
+  @Override
+  public String getTrackingUrl() {
+    return report.getTrackingUrl();
+  }
+
+  @Override
+  public String getOriginalTrackingUrl() {
+    return report.getOriginalTrackingUrl();
+  }
+
+  @Override
+  public long getStartTime() {
+    return report.getStartTime();
+  }
+
+  @Override
+  public long getFinishTime() {
+    return report.getFinishTime();
+  }
+
+  @Override
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return report.getFinalApplicationStatus();
+  }
+
+  @Override
+  public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
+    return report.getApplicationResourceUsageReport();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
new file mode 100644
index 0000000..79b2cb5
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ *
+ */
+public final class Hadoop20YarnContainerInfo implements YarnContainerInfo {
+
+  private final Container container;
+
+  public Hadoop20YarnContainerInfo(Container container) {
+    this.container = container;
+  }
+
+  @Override
+  public <T> T getContainer() {
+    return (T) container;
+  }
+
+  @Override
+  public String getId() {
+    return container.getId().toString();
+  }
+
+  @Override
+  public InetAddress getHost() {
+    try {
+      return InetAddress.getByName(container.getNodeId().getHost());
+    } catch (UnknownHostException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public int getPort() {
+    return container.getNodeId().getPort();
+  }
+
+  @Override
+  public int getMemoryMB() {
+    return container.getResource().getMemory();
+  }
+
+  @Override
+  public int getVirtualCores() {
+    return YarnUtils.getVirtualCores(container.getResource());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
new file mode 100644
index 0000000..cc61856
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ *
+ */
+public final class Hadoop20YarnContainerStatus implements YarnContainerStatus {
+
+  private final ContainerStatus containerStatus;
+
+  public Hadoop20YarnContainerStatus(ContainerStatus containerStatus) {
+    this.containerStatus = containerStatus;
+  }
+
+  @Override
+  public String getContainerId() {
+    return containerStatus.getContainerId().toString();
+  }
+
+  @Override
+  public ContainerState getState() {
+    return containerStatus.getState();
+  }
+
+  @Override
+  public int getExitStatus() {
+    return containerStatus.getExitStatus();
+  }
+
+  @Override
+  public String getDiagnostics() {
+    return containerStatus.getDiagnostics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
new file mode 100644
index 0000000..b1f6d66
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class Hadoop20YarnLaunchContext implements YarnLaunchContext {
+
+  private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
+
+  static {
+    // Creates transform function from YarnLocalResource -> LocalResource
+    RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
+      @Override
+      public LocalResource apply(YarnLocalResource input) {
+        return input.getLocalResource();
+      }
+    };
+  }
+
+  private final ContainerLaunchContext launchContext;
+
+  public Hadoop20YarnLaunchContext() {
+    launchContext = Records.newRecord(ContainerLaunchContext.class);
+  }
+
+  @Override
+  public <T> T getLaunchContext() {
+    return (T) launchContext;
+  }
+
+  @Override
+  public void setCredentials(Credentials credentials) {
+    launchContext.setContainerTokens(YarnUtils.encodeCredentials(credentials));
+  }
+
+  @Override
+  public void setLocalResources(Map<String, YarnLocalResource> localResources) {
+    launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
+  }
+
+  @Override
+  public void setServiceData(Map<String, ByteBuffer> serviceData) {
+    launchContext.setServiceData(serviceData);
+  }
+
+  @Override
+  public Map<String, String> getEnvironment() {
+    return launchContext.getEnvironment();
+  }
+
+  @Override
+  public void setEnvironment(Map<String, String> environment) {
+    launchContext.setEnvironment(environment);
+  }
+
+  @Override
+  public List<String> getCommands() {
+    return launchContext.getCommands();
+  }
+
+  @Override
+  public void setCommands(List<String> commands) {
+    launchContext.setCommands(commands);
+  }
+
+  @Override
+  public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
+    launchContext.setApplicationACLs(acls);
+  }
+}