You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:50 UTC

[08/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
new file mode 100644
index 0000000..6dcd1a7
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Collection of helper methods for common operations that usually needed when interacting with ZooKeeper.
+ */
+public final class ZKOperations {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZKOperations.class);
+
+  /**
+   * Represents a ZK operation updates callback.
+   * @param <T> Type of updated data.
+   */
+  public interface Callback<T> {
+    void updated(T data);
+  }
+
+  /**
+   * Interface for defining callback method to receive node data updates.
+   */
+  public interface DataCallback extends Callback<NodeData> {
+    /**
+     * Invoked when data of the node changed.
+     * @param nodeData New data of the node, or {@code null} if the node has been deleted.
+     */
+    @Override
+    void updated(NodeData nodeData);
+  }
+
+  /**
+   * Interface for defining callback method to receive children nodes updates.
+   */
+  public interface ChildrenCallback extends Callback<NodeChildren> {
+    @Override
+    void updated(NodeChildren nodeChildren);
+  }
+
+  private interface Operation<T> {
+    ZKClient getZKClient();
+
+    OperationFuture<T> exec(String path, Watcher watcher);
+  }
+
+  /**
+   * Watch for data changes of the given path. The callback will be triggered whenever changes has been
+   * detected. Note that the callback won't see every single changes, as that's not the guarantee of ZooKeeper.
+   * If the node doesn't exists, it will watch for its creation then starts watching for data changes.
+   * When the node is deleted afterwards,
+   *
+   * @param zkClient The {@link ZKClient} for the operation
+   * @param path Path to watch
+   * @param callback Callback to be invoked when data changes is detected.
+   * @return A {@link Cancellable} to cancel the watch.
+   */
+  public static Cancellable watchData(final ZKClient zkClient, final String path, final DataCallback callback) {
+    final AtomicBoolean cancelled = new AtomicBoolean(false);
+    watchChanges(new Operation<NodeData>() {
+
+      @Override
+      public ZKClient getZKClient() {
+        return zkClient;
+      }
+
+      @Override
+      public OperationFuture<NodeData> exec(String path, Watcher watcher) {
+        return zkClient.getData(path, watcher);
+      }
+    }, path, callback, cancelled);
+
+    return new Cancellable() {
+      @Override
+      public void cancel() {
+        cancelled.set(true);
+      }
+    };
+  }
+
+  public static ListenableFuture<String> watchDeleted(final ZKClient zkClient, final String path) {
+    SettableFuture<String> completion = SettableFuture.create();
+    watchDeleted(zkClient, path, completion);
+    return completion;
+  }
+
+  public static void watchDeleted(final ZKClient zkClient, final String path,
+                                  final SettableFuture<String> completion) {
+
+    Futures.addCallback(zkClient.exists(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (!completion.isDone()) {
+          if (event.getType() == Event.EventType.NodeDeleted) {
+            completion.set(path);
+          } else {
+            watchDeleted(zkClient, path, completion);
+          }
+        }
+      }
+    }), new FutureCallback<Stat>() {
+      @Override
+      public void onSuccess(Stat result) {
+        if (result == null) {
+          completion.set(path);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        completion.setException(t);
+      }
+    });
+  }
+
+  public static Cancellable watchChildren(final ZKClient zkClient, String path, ChildrenCallback callback) {
+    final AtomicBoolean cancelled = new AtomicBoolean(false);
+    watchChanges(new Operation<NodeChildren>() {
+
+      @Override
+      public ZKClient getZKClient() {
+        return zkClient;
+      }
+
+      @Override
+      public OperationFuture<NodeChildren> exec(String path, Watcher watcher) {
+        return zkClient.getChildren(path, watcher);
+      }
+    }, path, callback, cancelled);
+
+    return new Cancellable() {
+      @Override
+      public void cancel() {
+        cancelled.set(true);
+      }
+    };
+  }
+
+  /**
+   * Returns a new {@link OperationFuture} that the result will be the same as the given future, except that when
+   * the source future is having an exception matching the giving exception type, the errorResult will be set
+   * in to the returned {@link OperationFuture}.
+   * @param future The source future.
+   * @param exceptionType Type of {@link KeeperException} to be ignored.
+   * @param errorResult Object to be set into the resulting future on a matching exception.
+   * @param <V> Type of the result.
+   * @return A new {@link OperationFuture}.
+   */
+  public static <V> OperationFuture<V> ignoreError(OperationFuture<V> future,
+                                                   final Class<? extends KeeperException> exceptionType,
+                                                   final V errorResult) {
+    final SettableOperationFuture<V> resultFuture = SettableOperationFuture.create(future.getRequestPath(),
+                                                                                   Threads.SAME_THREAD_EXECUTOR);
+
+    Futures.addCallback(future, new FutureCallback<V>() {
+      @Override
+      public void onSuccess(V result) {
+        resultFuture.set(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        if (exceptionType.isAssignableFrom(t.getClass())) {
+          resultFuture.set(errorResult);
+        } else if (t instanceof CancellationException) {
+          resultFuture.cancel(true);
+        } else {
+          resultFuture.setException(t);
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    return resultFuture;
+  }
+
+  /**
+   * Deletes the given path recursively. The delete method will keep running until the given path is successfully
+   * removed, which means if there are new node created under the given path while deleting, they'll get deleted
+   * again.  If there is {@link KeeperException} during the deletion other than
+   * {@link KeeperException.NotEmptyException} or {@link KeeperException.NoNodeException},
+   * the exception would be reflected in the result future and deletion process will stop,
+   * leaving the given path with intermediate state.
+   *
+   * @param path The path to delete.
+   * @return An {@link OperationFuture} that will be completed when the given path is deleted or bailed due to
+   *         exception.
+   */
+  public static OperationFuture<String> recursiveDelete(final ZKClient zkClient, final String path) {
+    final SettableOperationFuture<String> resultFuture =
+      SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+
+    // Try to delete the given path.
+    Futures.addCallback(zkClient.delete(path), new FutureCallback<String>() {
+      private final FutureCallback<String> deleteCallback = this;
+
+      @Override
+      public void onSuccess(String result) {
+        // Path deleted successfully. Operation done.
+        resultFuture.set(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // Failed to delete the given path
+        if (!(t instanceof KeeperException.NotEmptyException || t instanceof KeeperException.NoNodeException)) {
+          // For errors other than NotEmptyException, treat the operation as failed.
+          resultFuture.setException(t);
+          return;
+        }
+
+        // If failed because of NotEmptyException, get the list of children under the given path
+        Futures.addCallback(zkClient.getChildren(path), new FutureCallback<NodeChildren>() {
+
+          @Override
+          public void onSuccess(NodeChildren result) {
+            // Delete all children nodes recursively.
+            final List<OperationFuture<String>> deleteFutures = Lists.newLinkedList();
+            for (String child :result.getChildren()) {
+              deleteFutures.add(recursiveDelete(zkClient, path + "/" + child));
+            }
+
+            // When deletion of all children succeeded, delete the given path again.
+            Futures.successfulAsList(deleteFutures).addListener(new Runnable() {
+              @Override
+              public void run() {
+                for (OperationFuture<String> deleteFuture : deleteFutures) {
+                  try {
+                    // If any exception when deleting children, treat the operation as failed.
+                    deleteFuture.get();
+                  } catch (Exception e) {
+                    resultFuture.setException(e.getCause());
+                  }
+                }
+                Futures.addCallback(zkClient.delete(path), deleteCallback, Threads.SAME_THREAD_EXECUTOR);
+              }
+            }, Threads.SAME_THREAD_EXECUTOR);
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            // If failed to get list of children, treat the operation as failed.
+            resultFuture.setException(t);
+          }
+        }, Threads.SAME_THREAD_EXECUTOR);
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    return resultFuture;
+  }
+
+  /**
+   * Watch for the given path until it exists.
+   * @param zkClient The {@link ZKClient} to use.
+   * @param path A ZooKeeper path to watch for existent.
+   */
+  private static void watchExists(final ZKClient zkClient, final String path, final SettableFuture<String> completion) {
+    Futures.addCallback(zkClient.exists(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (!completion.isDone()) {
+          watchExists(zkClient, path, completion);
+        }
+      }
+    }), new FutureCallback<Stat>() {
+      @Override
+      public void onSuccess(Stat result) {
+        if (result != null) {
+          completion.set(path);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        completion.setException(t);
+      }
+    });
+  }
+
+  private static <T> void watchChanges(final Operation<T> operation, final String path,
+                                       final Callback<T> callback, final AtomicBoolean cancelled) {
+    Futures.addCallback(operation.exec(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (!cancelled.get()) {
+          watchChanges(operation, path, callback, cancelled);
+        }
+      }
+    }), new FutureCallback<T>() {
+      @Override
+      public void onSuccess(T result) {
+        if (!cancelled.get()) {
+          callback.updated(result);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
+          final SettableFuture<String> existCompletion = SettableFuture.create();
+          existCompletion.addListener(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                if (!cancelled.get()) {
+                  watchChanges(operation, existCompletion.get(), callback, cancelled);
+                }
+              } catch (Exception e) {
+                LOG.error("Failed to watch children for path " + path, e);
+              }
+            }
+          }, Threads.SAME_THREAD_EXECUTOR);
+          watchExists(operation.getZKClient(), path, existCompletion);
+          return;
+        }
+        LOG.error("Failed to watch data for path " + path + " " + t, t);
+      }
+    });
+  }
+
+  private ZKOperations() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
new file mode 100644
index 0000000..e5bd237
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides functionality for ZooKeeper interactions.
+ */
+package org.apache.twill.zookeeper;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
new file mode 100644
index 0000000..601f0bd
--- /dev/null
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class RetryStrategyTest {
+
+  @Test
+  public void testNoRetry() {
+    RetryStrategy strategy = RetryStrategies.noRetry();
+    long startTime = System.currentTimeMillis();
+    for (int i = 1; i <= 10; i++) {
+      Assert.assertEquals(-1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+    }
+  }
+
+  @Test
+  public void testLimit() {
+    RetryStrategy strategy = RetryStrategies.limit(10, RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS));
+    long startTime = System.currentTimeMillis();
+    for (int i = 1; i <= 10; i++) {
+      Assert.assertEquals(1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+    }
+    Assert.assertEquals(-1L, strategy.nextRetry(11, startTime, RetryStrategy.OperationType.CREATE, "/"));
+  }
+
+  @Test
+  public void testUnlimited() {
+    RetryStrategy strategy = RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS);
+    long startTime = System.currentTimeMillis();
+    for (int i = 1; i <= 10; i++) {
+      Assert.assertEquals(1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+    }
+    Assert.assertEquals(1L, strategy.nextRetry(100000, startTime, RetryStrategy.OperationType.CREATE, "/"));
+  }
+
+  @Test
+  public void testExponential() {
+    RetryStrategy strategy = RetryStrategies.exponentialDelay(1, 60000, TimeUnit.MILLISECONDS);
+    long startTime = System.currentTimeMillis();
+    for (int i = 1; i <= 16; i++) {
+      Assert.assertEquals(1L << (i - 1), strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+    }
+    for (int i = 60; i <= 80; i++) {
+      Assert.assertEquals(60000, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+    }
+  }
+
+  @Test
+  public void testExponentialLimit() {
+    RetryStrategy strategy = RetryStrategies.limit(99,
+                                                   RetryStrategies.exponentialDelay(1, 60000, TimeUnit.MILLISECONDS));
+    long startTime = System.currentTimeMillis();
+    for (int i = 1; i <= 16; i++) {
+      Assert.assertEquals(1L << (i - 1), strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+    }
+    for (int i = 60; i <= 80; i++) {
+      Assert.assertEquals(60000, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
+    }
+    Assert.assertEquals(-1L, strategy.nextRetry(100, startTime, RetryStrategy.OperationType.CREATE, "/"));
+  }
+
+  @Test
+  public void testTimeLimit() throws InterruptedException {
+    RetryStrategy strategy = RetryStrategies.timeLimit(1, TimeUnit.SECONDS,
+                                                       RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS));
+    long startTime = System.currentTimeMillis();
+    Assert.assertEquals(1L, strategy.nextRetry(1, startTime, RetryStrategy.OperationType.CREATE, "/"));
+    TimeUnit.MILLISECONDS.sleep(1100);
+    Assert.assertEquals(-1L, strategy.nextRetry(2, startTime, RetryStrategy.OperationType.CREATE, "/"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
new file mode 100644
index 0000000..f1db74a
--- /dev/null
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.internal.zookeeper.KillZKSession;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class ZKClientTest {
+
+  @Test
+  public void testChroot() throws Exception {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+    zkServer.startAndWait();
+
+    try {
+      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/chroot").build();
+      client.startAndWait();
+      try {
+        List<OperationFuture<String>> futures = Lists.newArrayList();
+        futures.add(client.create("/test1/test2", null, CreateMode.PERSISTENT));
+        futures.add(client.create("/test1/test3", null, CreateMode.PERSISTENT));
+        Futures.successfulAsList(futures).get();
+
+        Assert.assertNotNull(client.exists("/test1/test2").get());
+        Assert.assertNotNull(client.exists("/test1/test3").get());
+
+      } finally {
+        client.stopAndWait();
+      }
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testCreateParent() throws ExecutionException, InterruptedException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+    zkServer.startAndWait();
+
+    try {
+      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      client.startAndWait();
+
+      try {
+        String path = client.create("/test1/test2/test3/test4/test5",
+                                    "testing".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL).get();
+        Assert.assertTrue(path.startsWith("/test1/test2/test3/test4/test5"));
+
+        String dataPath = "";
+        for (int i = 1; i <= 4; i++) {
+          dataPath = dataPath + "/test" + i;
+          Assert.assertNull(client.getData(dataPath).get().getData());
+        }
+        Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData(path).get().getData()));
+      } finally {
+        client.stopAndWait();
+      }
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testGetChildren() throws ExecutionException, InterruptedException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+    zkServer.startAndWait();
+
+    try {
+      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      client.startAndWait();
+
+      try {
+        client.create("/test", null, CreateMode.PERSISTENT).get();
+        Assert.assertTrue(client.getChildren("/test").get().getChildren().isEmpty());
+
+        Futures.allAsList(ImmutableList.of(client.create("/test/c1", null, CreateMode.EPHEMERAL),
+                                           client.create("/test/c2", null, CreateMode.EPHEMERAL))).get();
+
+        NodeChildren nodeChildren = client.getChildren("/test").get();
+        Assert.assertEquals(2, nodeChildren.getChildren().size());
+
+        Assert.assertEquals(ImmutableSet.of("c1", "c2"), ImmutableSet.copyOf(nodeChildren.getChildren()));
+
+      } finally {
+        client.stopAndWait();
+      }
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testSetData() throws ExecutionException, InterruptedException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+    zkServer.startAndWait();
+
+    try {
+      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      client.startAndWait();
+
+      client.create("/test", null, CreateMode.PERSISTENT).get();
+      Assert.assertNull(client.getData("/test").get().getData());
+
+      client.setData("/test", "testing".getBytes()).get();
+      Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData("/test").get().getData()));
+
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testExpireRewatch() throws InterruptedException, IOException, ExecutionException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+    zkServer.startAndWait();
+
+    try {
+      final CountDownLatch expireReconnectLatch = new CountDownLatch(1);
+      final AtomicBoolean expired = new AtomicBoolean(false);
+      final ZKClientService client = ZKClientServices.delegate(ZKClients.reWatchOnExpire(
+                                        ZKClientService.Builder.of(zkServer.getConnectionStr())
+                                                       .setSessionTimeout(2000)
+                                                       .setConnectionWatcher(new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+              if (event.getState() == Event.KeeperState.Expired) {
+                expired.set(true);
+              } else if (event.getState() == Event.KeeperState.SyncConnected && expired.compareAndSet(true, true)) {
+                expireReconnectLatch.countDown();
+              }
+            }
+          }).build()));
+      client.startAndWait();
+
+      try {
+        final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<Watcher.Event.EventType>();
+        client.exists("/expireRewatch", new Watcher() {
+          @Override
+          public void process(WatchedEvent event) {
+            client.exists("/expireRewatch", this);
+            events.add(event.getType());
+          }
+        });
+
+        client.create("/expireRewatch", null, CreateMode.PERSISTENT);
+        Assert.assertEquals(Watcher.Event.EventType.NodeCreated, events.poll(2, TimeUnit.SECONDS));
+
+        KillZKSession.kill(client.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 1000);
+
+        Assert.assertTrue(expireReconnectLatch.await(5, TimeUnit.SECONDS));
+
+        client.delete("/expireRewatch");
+        Assert.assertEquals(Watcher.Event.EventType.NodeDeleted, events.poll(4, TimeUnit.SECONDS));
+      } finally {
+        client.stopAndWait();
+      }
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+
+  @Test
+  public void testRetry() throws ExecutionException, InterruptedException, TimeoutException {
+    File dataDir = Files.createTempDir();
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(dataDir).setTickTime(1000).build();
+    zkServer.startAndWait();
+    int port = zkServer.getLocalAddress().getPort();
+
+    final CountDownLatch disconnectLatch = new CountDownLatch(1);
+    ZKClientService client = ZKClientServices.delegate(ZKClients.retryOnFailure(
+      ZKClientService.Builder.of(zkServer.getConnectionStr()).setConnectionWatcher(new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getState() == Event.KeeperState.Disconnected) {
+          disconnectLatch.countDown();
+        }
+      }
+    }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS)));
+    client.startAndWait();
+
+    zkServer.stopAndWait();
+
+    Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
+
+    final CountDownLatch createLatch = new CountDownLatch(1);
+    Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String result) {
+        createLatch.countDown();
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        t.printStackTrace(System.out);
+      }
+    });
+
+    TimeUnit.SECONDS.sleep(2);
+    zkServer = InMemoryZKServer.builder()
+                               .setDataDir(dataDir)
+                               .setAutoCleanDataDir(true)
+                               .setPort(port)
+                               .setTickTime(1000)
+                               .build();
+    zkServer.startAndWait();
+
+    try {
+      Assert.assertTrue(createLatch.await(5, TimeUnit.SECONDS));
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
new file mode 100644
index 0000000..9518d6e
--- /dev/null
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ *
+ */
+public class ZKOperationsTest {
+
+  @Test
+  public void recursiveDelete() throws ExecutionException, InterruptedException, TimeoutException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+    zkServer.startAndWait();
+
+    try {
+      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      client.startAndWait();
+
+      try {
+        client.create("/test1/test10/test101", null, CreateMode.PERSISTENT).get();
+        client.create("/test1/test10/test102", null, CreateMode.PERSISTENT).get();
+        client.create("/test1/test10/test103", null, CreateMode.PERSISTENT).get();
+
+        client.create("/test1/test11/test111", null, CreateMode.PERSISTENT).get();
+        client.create("/test1/test11/test112", null, CreateMode.PERSISTENT).get();
+        client.create("/test1/test11/test113", null, CreateMode.PERSISTENT).get();
+
+        ZKOperations.recursiveDelete(client, "/test1").get(2, TimeUnit.SECONDS);
+
+        Assert.assertNull(client.exists("/test1").get(2, TimeUnit.SECONDS));
+
+      } finally {
+        client.stopAndWait();
+      }
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/resources/logback-test.xml b/twill-zookeeper/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..157df6e
--- /dev/null
+++ b/twill-zookeeper/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.hadoop" level="WARN" />
+    <logger name="org.apache.zookeeper" level="WARN" />
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
deleted file mode 100644
index b11bc7a..0000000
--- a/yarn/pom.xml
+++ /dev/null
@@ -1,127 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>twill-parent</artifactId>
-        <groupId>org.apache.twill</groupId>
-        <version>0.1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>twill-yarn</artifactId>
-    <name>Twill Apache Hadoop YARN library</name>
-
-    <properties>
-        <output.dir>target/classes</output.dir>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-discovery-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>jcl-over-slf4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-common</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-minicluster</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <outputDirectory>${output.dir}</outputDirectory>
-    </build>
-
-    <profiles>
-        <profile>
-            <id>hadoop-2.0</id>
-            <properties>
-                <output.dir>${hadoop20.output.dir}</output.dir>
-            </properties>
-        </profile>
-        <profile>
-            <id>hadoop-2.1</id>
-            <build>
-                <resources>
-                    <resource>
-                        <directory>${hadoop20.output.dir}</directory>
-                    </resource>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                    </resource>
-                </resources>
-            </build>
-        </profile>
-        <profile>
-            <id>hadoop-2.2</id>
-            <build>
-                <resources>
-                    <resource>
-                        <directory>${hadoop20.output.dir}</directory>
-                    </resource>
-                    <resource>
-                        <directory>src/main/resources</directory>
-                    </resource>
-                </resources>
-            </build>
-        </profile>
-    </profiles>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
deleted file mode 100644
index d98dee1..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
-import org.apache.twill.internal.yarn.ports.AMRMClient;
-import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
-import org.apache.twill.internal.yarn.ports.AllocationResponse;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.List;
-import java.util.UUID;
-
-/**
- *
- */
-public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
-  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
-
-  static {
-    STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
-      @Override
-      public YarnContainerStatus apply(ContainerStatus status) {
-        return new Hadoop20YarnContainerStatus(status);
-      }
-    };
-  }
-
-  private final ContainerId containerId;
-  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
-  private final AMRMClient amrmClient;
-  private final YarnNMClient nmClient;
-  private InetSocketAddress trackerAddr;
-  private URL trackerUrl;
-  private Resource maxCapability;
-  private Resource minCapability;
-
-  public Hadoop20YarnAMClient(Configuration conf) {
-    String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
-    Preconditions.checkArgument(masterContainerId != null,
-                                "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
-    this.containerId = ConverterUtils.toContainerId(masterContainerId);
-    this.containerRequests = ArrayListMultimap.create();
-
-    this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
-    this.amrmClient.init(conf);
-    this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
-    Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
-
-    amrmClient.start();
-
-    RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
-                                                                                      trackerAddr.getPort(),
-                                                                                      trackerUrl.toString());
-    maxCapability = response.getMaximumResourceCapability();
-    minCapability = response.getMinimumResourceCapability();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
-    amrmClient.stop();
-  }
-
-  @Override
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-  @Override
-  public String getHost() {
-    return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
-  }
-
-  @Override
-  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
-    this.trackerAddr = trackerAddr;
-    this.trackerUrl = trackerUrl;
-  }
-
-  @Override
-  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
-    AllocationResponse response = amrmClient.allocate(progress);
-    List<ProcessLauncher<YarnContainerInfo>> launchers
-      = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
-
-    for (Container container : response.getAllocatedContainers()) {
-      launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
-    }
-
-    if (!launchers.isEmpty()) {
-      handler.acquired(launchers);
-
-      // If no process has been launched through the given launcher, return the container.
-      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
-        // This cast always works.
-        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
-        if (!launcher.isLaunched()) {
-          Container container = launcher.getContainerInfo().getContainer();
-          LOG.info("Nothing to run in container, releasing it: {}", container);
-          amrmClient.releaseAssignedContainer(container.getId());
-        }
-      }
-    }
-
-    List<YarnContainerStatus> completed = ImmutableList.copyOf(
-      Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
-    if (!completed.isEmpty()) {
-      handler.completed(completed);
-    }
-  }
-
-  @Override
-  public ContainerRequestBuilder addContainerRequest(Resource capability) {
-    return addContainerRequest(capability, 1);
-  }
-
-  @Override
-  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
-    return new ContainerRequestBuilder(adjustCapability(capability), count) {
-      @Override
-      public String apply() {
-        synchronized (Hadoop20YarnAMClient.this) {
-          String id = UUID.randomUUID().toString();
-
-          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
-          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
-
-          for (int i = 0; i < count; i++) {
-            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
-                                                                                  priority, 1);
-            containerRequests.put(id, request);
-            amrmClient.addContainerRequest(request);
-          }
-
-          return id;
-        }
-      }
-    };
-  }
-
-  @Override
-  public synchronized void completeContainerRequest(String id) {
-    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
-      amrmClient.removeContainerRequest(request);
-    }
-  }
-
-  private Resource adjustCapability(Resource resource) {
-    int cores = YarnUtils.getVirtualCores(resource);
-    int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
-                                YarnUtils.getVirtualCores(minCapability));
-    // Try and set the virtual cores, which older versions of YARN don't support this.
-    if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
-      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
-    }
-
-    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
-    int minMemory = minCapability.getMemory();
-    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
-
-    if (resource.getMemory() != updatedMemory) {
-      resource.setMemory(updatedMemory);
-      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
-    }
-
-    return resource;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
deleted file mode 100644
index bfec34e..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationSubmitter;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.Records;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-/**
- *
- */
-public final class Hadoop20YarnAppClient extends AbstractIdleService implements YarnAppClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAppClient.class);
-  private final YarnClient yarnClient;
-  private String user;
-
-  public Hadoop20YarnAppClient(Configuration configuration) {
-    this.yarnClient = new YarnClientImpl();
-    yarnClient.init(configuration);
-    this.user = System.getProperty("user.name");
-  }
-
-  @Override
-  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
-    // Request for new application
-    final GetNewApplicationResponse response = yarnClient.getNewApplication();
-    final ApplicationId appId = response.getApplicationId();
-
-    // Setup the context for application submission
-    final ApplicationSubmissionContext appSubmissionContext = Records.newRecord(ApplicationSubmissionContext.class);
-    appSubmissionContext.setApplicationId(appId);
-    appSubmissionContext.setApplicationName(twillSpec.getName());
-    appSubmissionContext.setUser(user);
-
-    ApplicationSubmitter submitter = new ApplicationSubmitter() {
-
-      @Override
-      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) {
-        ContainerLaunchContext context = launchContext.getLaunchContext();
-        addRMToken(context);
-        context.setUser(appSubmissionContext.getUser());
-        context.setResource(adjustMemory(response, capability));
-        appSubmissionContext.setAMContainerSpec(context);
-
-        try {
-          yarnClient.submitApplication(appSubmissionContext);
-          return new ProcessControllerImpl(yarnClient, appId);
-        } catch (YarnRemoteException e) {
-          LOG.error("Failed to submit application {}", appId, e);
-          throw Throwables.propagate(e);
-        }
-      }
-    };
-
-    return new ApplicationMasterProcessLauncher(appId, submitter);
-  }
-
-  private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
-    int minMemory = response.getMinimumResourceCapability().getMemory();
-
-    int updatedMemory = Math.min(capability.getMemory(), response.getMaximumResourceCapability().getMemory());
-    updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
-
-    if (updatedMemory != capability.getMemory()) {
-      capability.setMemory(updatedMemory);
-    }
-
-    return capability;
-  }
-
-  private void addRMToken(ContainerLaunchContext context) {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return;
-    }
-
-    try {
-      Credentials credentials = YarnUtils.decodeCredentials(context.getContainerTokens());
-
-      Configuration config = yarnClient.getConfig();
-      Token<TokenIdentifier> token = convertToken(
-        yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
-        YarnUtils.getRMAddress(config));
-
-      LOG.info("Added RM delegation token {}", token);
-      credentials.addToken(token.getService(), token);
-
-      context.setContainerTokens(YarnUtils.encodeCredentials(credentials));
-
-    } catch (Exception e) {
-      LOG.error("Fails to create credentials.", e);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private <T extends TokenIdentifier> Token<T> convertToken(DelegationToken protoToken, InetSocketAddress serviceAddr) {
-    Token<T> token = new Token<T>(protoToken.getIdentifier().array(),
-                                  protoToken.getPassword().array(),
-                                  new Text(protoToken.getKind()),
-                                  new Text(protoToken.getService()));
-    if (serviceAddr != null) {
-      SecurityUtil.setTokenService(token, serviceAddr);
-    }
-    return token;
-  }
-
-  @Override
-  public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
-    this.user = user;
-    return createLauncher(twillSpec);
-  }
-
-  @Override
-  public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
-    return new ProcessControllerImpl(yarnClient, appId);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    yarnClient.start();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    yarnClient.stop();
-  }
-
-  private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
-    private final YarnClient yarnClient;
-    private final ApplicationId appId;
-
-    public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
-      this.yarnClient = yarnClient;
-      this.appId = appId;
-    }
-
-    @Override
-    public YarnApplicationReport getReport() {
-      try {
-        return new Hadoop20YarnApplicationReport(yarnClient.getApplicationReport(appId));
-      } catch (YarnRemoteException e) {
-        LOG.error("Failed to get application report {}", appId, e);
-        throw Throwables.propagate(e);
-      }
-    }
-
-    @Override
-    public void cancel() {
-      try {
-        yarnClient.killApplication(appId);
-      } catch (YarnRemoteException e) {
-        LOG.error("Failed to kill application {}", appId, e);
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
deleted file mode 100644
index 6c1b764..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-
-/**
- *
- */
-public final class Hadoop20YarnApplicationReport implements YarnApplicationReport {
-
-  private final ApplicationReport report;
-
-  public Hadoop20YarnApplicationReport(ApplicationReport report) {
-    this.report = report;
-  }
-
-  @Override
-  public ApplicationId getApplicationId() {
-    return report.getApplicationId();
-  }
-
-  @Override
-  public ApplicationAttemptId getCurrentApplicationAttemptId() {
-    return report.getCurrentApplicationAttemptId();
-  }
-
-  @Override
-  public String getQueue() {
-    return report.getQueue();
-  }
-
-  @Override
-  public String getName() {
-    return report.getName();
-  }
-
-  @Override
-  public String getHost() {
-    return report.getHost();
-  }
-
-  @Override
-  public int getRpcPort() {
-    return report.getRpcPort();
-  }
-
-  @Override
-  public YarnApplicationState getYarnApplicationState() {
-    return report.getYarnApplicationState();
-  }
-
-  @Override
-  public String getDiagnostics() {
-    return report.getDiagnostics();
-  }
-
-  @Override
-  public String getTrackingUrl() {
-    return report.getTrackingUrl();
-  }
-
-  @Override
-  public String getOriginalTrackingUrl() {
-    return report.getOriginalTrackingUrl();
-  }
-
-  @Override
-  public long getStartTime() {
-    return report.getStartTime();
-  }
-
-  @Override
-  public long getFinishTime() {
-    return report.getFinishTime();
-  }
-
-  @Override
-  public FinalApplicationStatus getFinalApplicationStatus() {
-    return report.getFinalApplicationStatus();
-  }
-
-  @Override
-  public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
-    return report.getApplicationResourceUsageReport();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
deleted file mode 100644
index 79b2cb5..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerInfo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.yarn.api.records.Container;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- *
- */
-public final class Hadoop20YarnContainerInfo implements YarnContainerInfo {
-
-  private final Container container;
-
-  public Hadoop20YarnContainerInfo(Container container) {
-    this.container = container;
-  }
-
-  @Override
-  public <T> T getContainer() {
-    return (T) container;
-  }
-
-  @Override
-  public String getId() {
-    return container.getId().toString();
-  }
-
-  @Override
-  public InetAddress getHost() {
-    try {
-      return InetAddress.getByName(container.getNodeId().getHost());
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public int getPort() {
-    return container.getNodeId().getPort();
-  }
-
-  @Override
-  public int getMemoryMB() {
-    return container.getResource().getMemory();
-  }
-
-  @Override
-  public int getVirtualCores() {
-    return YarnUtils.getVirtualCores(container.getResource());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
deleted file mode 100644
index cc61856..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnContainerStatus.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-
-/**
- *
- */
-public final class Hadoop20YarnContainerStatus implements YarnContainerStatus {
-
-  private final ContainerStatus containerStatus;
-
-  public Hadoop20YarnContainerStatus(ContainerStatus containerStatus) {
-    this.containerStatus = containerStatus;
-  }
-
-  @Override
-  public String getContainerId() {
-    return containerStatus.getContainerId().toString();
-  }
-
-  @Override
-  public ContainerState getState() {
-    return containerStatus.getState();
-  }
-
-  @Override
-  public int getExitStatus() {
-    return containerStatus.getExitStatus();
-  }
-
-  @Override
-  public String getDiagnostics() {
-    return containerStatus.getDiagnostics();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
deleted file mode 100644
index b1f6d66..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLaunchContext.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public final class Hadoop20YarnLaunchContext implements YarnLaunchContext {
-
-  private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
-
-  static {
-    // Creates transform function from YarnLocalResource -> LocalResource
-    RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
-      @Override
-      public LocalResource apply(YarnLocalResource input) {
-        return input.getLocalResource();
-      }
-    };
-  }
-
-  private final ContainerLaunchContext launchContext;
-
-  public Hadoop20YarnLaunchContext() {
-    launchContext = Records.newRecord(ContainerLaunchContext.class);
-  }
-
-  @Override
-  public <T> T getLaunchContext() {
-    return (T) launchContext;
-  }
-
-  @Override
-  public void setCredentials(Credentials credentials) {
-    launchContext.setContainerTokens(YarnUtils.encodeCredentials(credentials));
-  }
-
-  @Override
-  public void setLocalResources(Map<String, YarnLocalResource> localResources) {
-    launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
-  }
-
-  @Override
-  public void setServiceData(Map<String, ByteBuffer> serviceData) {
-    launchContext.setServiceData(serviceData);
-  }
-
-  @Override
-  public Map<String, String> getEnvironment() {
-    return launchContext.getEnvironment();
-  }
-
-  @Override
-  public void setEnvironment(Map<String, String> environment) {
-    launchContext.setEnvironment(environment);
-  }
-
-  @Override
-  public List<String> getCommands() {
-    return launchContext.getCommands();
-  }
-
-  @Override
-  public void setCommands(List<String> commands) {
-    launchContext.setCommands(commands);
-  }
-
-  @Override
-  public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
-    launchContext.setApplicationACLs(acls);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
deleted file mode 100644
index b327b94..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- *
- */
-public final class Hadoop20YarnLocalResource implements YarnLocalResource {
-
-  private final LocalResource localResource;
-
-  public Hadoop20YarnLocalResource() {
-    this.localResource = Records.newRecord(LocalResource.class);
-  }
-
-  @Override
-  public <T> T getLocalResource() {
-    return (T) localResource;
-  }
-
-  @Override
-  public URL getResource() {
-    return localResource.getResource();
-  }
-
-  @Override
-  public void setResource(URL resource) {
-    localResource.setResource(resource);
-  }
-
-  @Override
-  public long getSize() {
-    return localResource.getSize();
-  }
-
-  @Override
-  public void setSize(long size) {
-    localResource.setSize(size);
-  }
-
-  @Override
-  public long getTimestamp() {
-    return localResource.getTimestamp();
-  }
-
-  @Override
-  public void setTimestamp(long timestamp) {
-    localResource.setTimestamp(timestamp);
-  }
-
-  @Override
-  public LocalResourceType getType() {
-    return localResource.getType();
-  }
-
-  @Override
-  public void setType(LocalResourceType type) {
-    localResource.setType(type);
-  }
-
-  @Override
-  public LocalResourceVisibility getVisibility() {
-    return localResource.getVisibility();
-  }
-
-  @Override
-  public void setVisibility(LocalResourceVisibility visibility) {
-    localResource.setVisibility(visibility);
-  }
-
-  @Override
-  public String getPattern() {
-    return localResource.getPattern();
-  }
-
-  @Override
-  public void setPattern(String pattern) {
-    localResource.setPattern(pattern);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
deleted file mode 100644
index 98ecc67..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.common.Cancellable;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.Records;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-/**
- *
- */
-public final class Hadoop20YarnNMClient implements YarnNMClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnNMClient.class);
-
-  private final YarnRPC yarnRPC;
-  private final Configuration yarnConf;
-
-  public Hadoop20YarnNMClient(YarnRPC yarnRPC, Configuration yarnConf) {
-    this.yarnRPC = yarnRPC;
-    this.yarnConf = yarnConf;
-  }
-
-  @Override
-  public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
-    ContainerLaunchContext context = launchContext.getLaunchContext();
-    context.setUser(System.getProperty("user.name"));
-
-    Container container = containerInfo.getContainer();
-
-    context.setContainerId(container.getId());
-    context.setResource(container.getResource());
-
-    StartContainerRequest startRequest = Records.newRecord(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(context);
-
-    ContainerManager manager = connectContainerManager(container);
-    try {
-      manager.startContainer(startRequest);
-      return new ContainerTerminator(container, manager);
-    } catch (YarnRemoteException e) {
-      LOG.error("Error in launching process", e);
-      throw Throwables.propagate(e);
-    }
-
-  }
-
-  /**
-   * Helper to connect to container manager (node manager).
-   */
-  private ContainerManager connectContainerManager(Container container) {
-    String cmIpPortStr = String.format("%s:%d", container.getNodeId().getHost(), container.getNodeId().getPort());
-    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
-    return ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddress, yarnConf));
-  }
-
-  private static final class ContainerTerminator implements Cancellable {
-
-    private final Container container;
-    private final ContainerManager manager;
-
-    private ContainerTerminator(Container container, ContainerManager manager) {
-      this.container = container;
-      this.manager = manager;
-    }
-
-    @Override
-    public void cancel() {
-      LOG.info("Request to stop container {}.", container.getId());
-      StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
-      stopRequest.setContainerId(container.getId());
-      try {
-        manager.stopContainer(stopRequest);
-        boolean completed = false;
-        while (!completed) {
-          GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class);
-          statusRequest.setContainerId(container.getId());
-          GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest);
-          LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
-
-          completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE);
-        }
-        LOG.info("Container {} stopped.", container.getId());
-      } catch (YarnRemoteException e) {
-        LOG.error("Fail to stop container {}", container.getId(), e);
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
deleted file mode 100644
index 26b6fa2..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.service.Service;
-
-/**
- * Ported from Apache Hadoop YARN.
- */
-public interface AMRMClient extends Service {
-
-  /**
-   * Value used to define no locality.
-   */
-  static final String ANY = "*";
-
-  /**
-   * Object to represent container request for resources.
-   * Resources may be localized to nodes and racks.
-   * Resources may be assigned priorities.
-   * Can ask for multiple containers of a given type.
-   */
-  public static class ContainerRequest {
-    Resource capability;
-    String[] hosts;
-    String[] racks;
-    Priority priority;
-    int containerCount;
-
-    public ContainerRequest(Resource capability, String[] hosts,
-                            String[] racks, Priority priority, int containerCount) {
-      this.capability = capability;
-      this.hosts = (hosts != null ? hosts.clone() : null);
-      this.racks = (racks != null ? racks.clone() : null);
-      this.priority = priority;
-      this.containerCount = containerCount;
-    }
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("Capability[").append(capability).append("]");
-      sb.append("Priority[").append(priority).append("]");
-      sb.append("ContainerCount[").append(containerCount).append("]");
-      return sb.toString();
-    }
-  }
-
-  /**
-   * Register the application master. This must be called before any
-   * other interaction
-   * @param appHostName Name of the host on which master is running
-   * @param appHostPort Port master is listening on
-   * @param appTrackingUrl URL at which the master info can be seen
-   * @return <code>RegisterApplicationMasterResponse</code>
-   * @throws org.apache.hadoop.yarn.exceptions.YarnRemoteException
-   */
-  public RegisterApplicationMasterResponse
-  registerApplicationMaster(String appHostName,
-                            int appHostPort,
-                            String appTrackingUrl)
-    throws YarnRemoteException;
-
-  /**
-   * Request additional containers and receive new container allocations.
-   * Requests made via <code>addContainerRequest</code> are sent to the
-   * <code>ResourceManager</code>. New containers assigned to the master are
-   * retrieved. Status of completed containers and node health updates are
-   * also retrieved.
-   * This also doubles as a heartbeat to the ResourceManager and must be
-   * made periodically.
-   * The call may not always return any new allocations of containers.
-   * App should not make concurrent allocate requests. May cause request loss.
-   * @param progressIndicator Indicates progress made by the master
-   * @return the response of the allocate request
-   * @throws YarnRemoteException
-   */
-  public AllocationResponse allocate(float progressIndicator)
-    throws YarnRemoteException;
-
-  /**
-   * Unregister the Application Master. This must be called in the end.
-   * @param appStatus Success/Failure status of the master
-   * @param appMessage Diagnostics message on failure
-   * @param appTrackingUrl New URL to get master info
-   * @throws YarnRemoteException
-   */
-  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-                                          String appMessage,
-                                          String appTrackingUrl)
-    throws YarnRemoteException;
-
-  /**
-   * Request containers for resources before calling <code>allocate</code>.
-   * @param req Resource request
-   */
-  public void addContainerRequest(ContainerRequest req);
-
-  /**
-   * Remove previous container request. The previous container request may have
-   * already been sent to the ResourceManager. So even after the remove request
-   * the app must be prepared to receive an allocation for the previous request
-   * even after the remove request
-   * @param req Resource request
-   */
-  public void removeContainerRequest(ContainerRequest req);
-
-  /**
-   * Release containers assigned by the Resource Manager. If the app cannot use
-   * the container or wants to give up the container then it can release it.
-   * The app needs to make new requests for the released resource capability if
-   * it still needs it. For example, if it released non-local resources
-   * @param containerId
-   */
-  public void releaseAssignedContainer(ContainerId containerId);
-
-  /**
-   * Get the currently available resources in the cluster.
-   * A valid value is available after a call to allocate has been made
-   * @return Currently available resources
-   */
-  public Resource getClusterAvailableResources();
-
-  /**
-   * Get the current number of nodes in the cluster.
-   * A valid values is available after a call to allocate has been made
-   * @return Current number of nodes in the cluster
-   */
-  public int getClusterNodeCount();
-}