You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:45 UTC

[03/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
deleted file mode 100644
index 131f90a..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillApplication;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.internal.EnvKeys;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.common.io.LineReader;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.Socket;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Using echo server to test resource reports.
- * This test is executed by {@link org.apache.twill.yarn.YarnTestSuite}.
- */
-public class ResourceReportTestRun {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ResourceReportTestRun.class);
-
-  private class ResourceApplication implements TwillApplication {
-    @Override
-    public TwillSpecification configure() {
-      return TwillSpecification.Builder.with()
-        .setName("ResourceApplication")
-        .withRunnable()
-          .add("echo1", new EchoServer(), ResourceSpecification.Builder.with()
-            .setVirtualCores(1)
-            .setMemory(128, ResourceSpecification.SizeUnit.MEGA)
-            .setInstances(2).build()).noLocalFiles()
-          .add("echo2", new EchoServer(), ResourceSpecification.Builder.with()
-            .setVirtualCores(2)
-            .setMemory(256, ResourceSpecification.SizeUnit.MEGA)
-            .setInstances(1).build()).noLocalFiles()
-        .anyOrder()
-        .build();
-    }
-  }
-
-  @Test
-  public void testRunnablesGetAllowedResourcesInEnv() throws InterruptedException, IOException,
-    TimeoutException, ExecutionException {
-    TwillRunner runner = YarnTestSuite.getTwillRunner();
-
-    ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
-      .setVirtualCores(1)
-      .setMemory(2048, ResourceSpecification.SizeUnit.MEGA)
-      .setInstances(1)
-      .build();
-    TwillController controller = runner.prepare(new EnvironmentEchoServer(), resourceSpec)
-      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-      .withApplicationArguments("envecho")
-      .withArguments("EnvironmentEchoServer", "echo2")
-      .start();
-
-    final CountDownLatch running = new CountDownLatch(1);
-    controller.addListener(new ServiceListenerAdapter() {
-      @Override
-      public void running() {
-        running.countDown();
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
-
-    Iterable<Discoverable> envEchoServices = controller.discoverService("envecho");
-    Assert.assertTrue(YarnTestSuite.waitForSize(envEchoServices, 1, 30));
-
-    // TODO: check virtual cores once yarn adds the ability
-    Map<String, String> expectedValues = Maps.newHashMap();
-    expectedValues.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, "2048");
-    expectedValues.put(EnvKeys.TWILL_INSTANCE_COUNT, "1");
-
-    // check environment of the runnable.
-    Discoverable discoverable = envEchoServices.iterator().next();
-    for (Map.Entry<String, String> expected : expectedValues.entrySet()) {
-      Socket socket = new Socket(discoverable.getSocketAddress().getHostName(),
-                                 discoverable.getSocketAddress().getPort());
-      try {
-        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
-        LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
-        writer.println(expected.getKey());
-        Assert.assertEquals(expected.getValue(), reader.readLine());
-      } finally {
-        socket.close();
-      }
-    }
-
-    controller.stop().get(30, TimeUnit.SECONDS);
-    // Sleep a bit before exiting.
-    TimeUnit.SECONDS.sleep(2);
-  }
-
-  @Test
-  public void testResourceReportWithFailingContainers() throws InterruptedException, IOException,
-    TimeoutException, ExecutionException {
-    TwillRunner runner = YarnTestSuite.getTwillRunner();
-
-    ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
-      .setVirtualCores(1)
-      .setMemory(128, ResourceSpecification.SizeUnit.MEGA)
-      .setInstances(2)
-      .build();
-    TwillController controller = runner.prepare(new BuggyServer(), resourceSpec)
-      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-      .withApplicationArguments("echo")
-      .withArguments("BuggyServer", "echo2")
-      .start();
-
-    final CountDownLatch running = new CountDownLatch(1);
-    controller.addListener(new ServiceListenerAdapter() {
-      @Override
-      public void running() {
-        running.countDown();
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
-
-    Iterable<Discoverable> echoServices = controller.discoverService("echo");
-    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
-    // check that we have 2 runnables.
-    ResourceReport report = controller.getResourceReport();
-    Assert.assertEquals(2, report.getRunnableResources("BuggyServer").size());
-
-    // cause a divide by 0 in one server
-    Discoverable discoverable = echoServices.iterator().next();
-    Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
-                               discoverable.getSocketAddress().getPort());
-    try {
-      PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
-      writer.println("0");
-    } finally {
-      socket.close();
-    }
-
-    // takes some time for app master to find out the container completed...
-    TimeUnit.SECONDS.sleep(5);
-    // check that we have 1 runnable, not 2.
-    report = controller.getResourceReport();
-    Assert.assertEquals(1, report.getRunnableResources("BuggyServer").size());
-
-    controller.stop().get(30, TimeUnit.SECONDS);
-    // Sleep a bit before exiting.
-    TimeUnit.SECONDS.sleep(2);
-  }
-
-  @Test
-  public void testResourceReport() throws InterruptedException, ExecutionException, IOException,
-    URISyntaxException, TimeoutException {
-    TwillRunner runner = YarnTestSuite.getTwillRunner();
-
-    TwillController controller = runner.prepare(new ResourceApplication())
-                                        .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-                                        .withApplicationArguments("echo")
-                                        .withArguments("echo1", "echo1")
-                                        .withArguments("echo2", "echo2")
-                                        .start();
-
-    final CountDownLatch running = new CountDownLatch(1);
-    controller.addListener(new ServiceListenerAdapter() {
-      @Override
-      public void running() {
-        running.countDown();
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
-
-    // wait for 3 echo servers to come up
-    Iterable<Discoverable> echoServices = controller.discoverService("echo");
-    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 3, 60));
-    ResourceReport report = controller.getResourceReport();
-    // make sure resources for echo1 and echo2 are there
-    Map<String, Collection<TwillRunResources>> usedResources = report.getResources();
-    Assert.assertEquals(2, usedResources.keySet().size());
-    Assert.assertTrue(usedResources.containsKey("echo1"));
-    Assert.assertTrue(usedResources.containsKey("echo2"));
-
-    Collection<TwillRunResources> echo1Resources = usedResources.get("echo1");
-    // 2 instances of echo1
-    Assert.assertEquals(2, echo1Resources.size());
-    // TODO: check cores after hadoop-2.1.0
-    for (TwillRunResources resources : echo1Resources) {
-      Assert.assertEquals(128, resources.getMemoryMB());
-    }
-
-    Collection<TwillRunResources> echo2Resources = usedResources.get("echo2");
-    // 2 instances of echo1
-    Assert.assertEquals(1, echo2Resources.size());
-    // TODO: check cores after hadoop-2.1.0
-    for (TwillRunResources resources : echo2Resources) {
-      Assert.assertEquals(256, resources.getMemoryMB());
-    }
-
-    // Decrease number of instances of echo1 from 2 to 1
-    controller.changeInstances("echo1", 1);
-    echoServices = controller.discoverService("echo1");
-    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 1, 60));
-    report = controller.getResourceReport();
-
-    // make sure resources for echo1 and echo2 are there
-    usedResources = report.getResources();
-    Assert.assertEquals(2, usedResources.keySet().size());
-    Assert.assertTrue(usedResources.containsKey("echo1"));
-    Assert.assertTrue(usedResources.containsKey("echo2"));
-
-    echo1Resources = usedResources.get("echo1");
-    // 1 instance of echo1 now
-    Assert.assertEquals(1, echo1Resources.size());
-    // TODO: check cores after hadoop-2.1.0
-    for (TwillRunResources resources : echo1Resources) {
-      Assert.assertEquals(128, resources.getMemoryMB());
-    }
-
-    echo2Resources = usedResources.get("echo2");
-    // 2 instances of echo1
-    Assert.assertEquals(1, echo2Resources.size());
-    // TODO: check cores after hadoop-2.1.0
-    for (TwillRunResources resources : echo2Resources) {
-      Assert.assertEquals(256, resources.getMemoryMB());
-    }
-
-    controller.stop().get(30, TimeUnit.SECONDS);
-    // Sleep a bit before exiting.
-    TimeUnit.SECONDS.sleep(2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
deleted file mode 100644
index 5148ed2..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.TwillContext;
-import org.apache.twill.api.TwillContext;
-import org.apache.twill.common.Cancellable;
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.util.List;
-
-/**
- * Boilerplate for a server that announces itself and talks to clients through a socket.
- */
-public abstract class SocketServer extends AbstractTwillRunnable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SocketServer.class);
-
-  protected volatile boolean running;
-  protected volatile Thread runThread;
-  protected ServerSocket serverSocket;
-  protected Cancellable canceller;
-
-  @Override
-  public void initialize(TwillContext context) {
-    super.initialize(context);
-    running = true;
-    try {
-      serverSocket = new ServerSocket(0);
-      LOG.info("Server started: " + serverSocket.getLocalSocketAddress() +
-               ", id: " + context.getInstanceId() +
-               ", count: " + context.getInstanceCount());
-
-      final List<Cancellable> cancellables = ImmutableList.of(
-        context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()),
-        context.announce(context.getArguments()[0], serverSocket.getLocalPort())
-      );
-      canceller = new Cancellable() {
-        @Override
-        public void cancel() {
-          for (Cancellable c : cancellables) {
-            c.cancel();
-          }
-        }
-      };
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public void run() {
-    try {
-      runThread = Thread.currentThread();
-      while (running) {
-        try {
-          Socket socket = serverSocket.accept();
-          try {
-            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
-            PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
-            handleRequest(reader, writer);
-          } finally {
-            socket.close();
-          }
-        } catch (SocketException e) {
-          LOG.info("Socket exception: " + e);
-        }
-      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void stop() {
-    LOG.info("Stopping server");
-    canceller.cancel();
-    running = false;
-    Thread t = runThread;
-    if (t != null) {
-      t.interrupt();
-    }
-    try {
-      serverSocket.close();
-    } catch (IOException e) {
-      LOG.error("Exception while closing socket.", e);
-      throw Throwables.propagate(e);
-    }
-    serverSocket = null;
-  }
-
-  @Override
-  public void destroy() {
-    try {
-      if (serverSocket != null) {
-        serverSocket.close();
-      }
-    } catch (IOException e) {
-      LOG.error("Exception while closing socket.", e);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  abstract public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
deleted file mode 100644
index 5a93271..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Service;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.PrintWriter;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Testing application master will shutdown itself when all tasks are completed.
- * This test is executed by {@link YarnTestSuite}.
- */
-public class TaskCompletedTestRun {
-
-  public static final class SleepTask extends AbstractTwillRunnable {
-
-    @Override
-    public void run() {
-      // Randomly sleep for 3-5 seconds.
-      try {
-        TimeUnit.SECONDS.sleep(new Random().nextInt(3) + 3);
-      } catch (InterruptedException e) {
-        throw Throwables.propagate(e);
-      }
-    }
-
-    @Override
-    public void stop() {
-      // No-op
-    }
-  }
-
-  @Test
-  public void testTaskCompleted() throws InterruptedException {
-    TwillRunner twillRunner = YarnTestSuite.getTwillRunner();
-    TwillController controller = twillRunner.prepare(new SleepTask(),
-                                                ResourceSpecification.Builder.with()
-                                                  .setVirtualCores(1)
-                                                  .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
-                                                  .setInstances(3).build())
-                                            .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-                                            .start();
-
-    final CountDownLatch runLatch = new CountDownLatch(1);
-    final CountDownLatch stopLatch = new CountDownLatch(1);
-    controller.addListener(new ServiceListenerAdapter() {
-
-      @Override
-      public void running() {
-        runLatch.countDown();
-      }
-
-      @Override
-      public void terminated(Service.State from) {
-        stopLatch.countDown();
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    Assert.assertTrue(runLatch.await(1, TimeUnit.MINUTES));
-
-    Assert.assertTrue(stopLatch.await(1, TimeUnit.MINUTES));
-
-    TimeUnit.SECONDS.sleep(2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java b/yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
deleted file mode 100644
index 8be907b..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.TwillSpecification;
-import com.google.common.collect.ImmutableSet;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- *
- */
-public class TwillSpecificationTest {
-
-  /**
-   * Dummy for test.
-   */
-  public static final class DummyRunnable extends AbstractTwillRunnable {
-
-    @Override
-    public void stop() {
-      // no-op
-    }
-
-    @Override
-    public void run() {
-      // no-op
-    }
-  }
-
-  @Test
-  public void testAnyOrder() {
-    TwillSpecification spec =
-      TwillSpecification.Builder.with()
-        .setName("Testing")
-        .withRunnable()
-        .add("r1", new DummyRunnable()).noLocalFiles()
-        .add("r2", new DummyRunnable()).noLocalFiles()
-        .add("r3", new DummyRunnable()).noLocalFiles()
-        .anyOrder()
-        .build();
-
-    Assert.assertEquals(3, spec.getRunnables().size());
-    List<TwillSpecification.Order> orders = spec.getOrders();
-    Assert.assertEquals(1, orders.size());
-    Assert.assertEquals(ImmutableSet.of("r1", "r2", "r3"), orders.get(0).getNames());
-  }
-
-  @Test
-  public void testOrder() {
-    TwillSpecification spec =
-      TwillSpecification.Builder.with()
-        .setName("Testing")
-        .withRunnable()
-        .add("r1", new DummyRunnable()).noLocalFiles()
-        .add("r2", new DummyRunnable()).noLocalFiles()
-        .add("r3", new DummyRunnable()).noLocalFiles()
-        .add("r4", new DummyRunnable()).noLocalFiles()
-        .withOrder().begin("r1", "r2").nextWhenStarted("r3")
-        .build();
-
-    Assert.assertEquals(4, spec.getRunnables().size());
-    List<TwillSpecification.Order> orders = spec.getOrders();
-    Assert.assertEquals(3, orders.size());
-    Assert.assertEquals(ImmutableSet.of("r1", "r2"), orders.get(0).getNames());
-    Assert.assertEquals(ImmutableSet.of("r3"), orders.get(1).getNames());
-    Assert.assertEquals(ImmutableSet.of("r4"), orders.get(2).getNames());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
deleted file mode 100644
index b55d620..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.TwillRunnerService;
-import org.apache.twill.filesystem.LocalLocationFactory;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.internal.yarn.YarnUtils;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test suite for all tests with mini yarn cluster.
- */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
-                      EchoServerTestRun.class,
-                      ResourceReportTestRun.class,
-                      TaskCompletedTestRun.class,
-                      DistributeShellTestRun.class,
-                      LocalFileTestRun.class,
-                      FailureRestartTestRun.class,
-                      ProvisionTimeoutTestRun.class
-                    })
-public class YarnTestSuite {
-  private static final Logger LOG = LoggerFactory.getLogger(YarnTestSuite.class);
-
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  private static InMemoryZKServer zkServer;
-  private static MiniYARNCluster cluster;
-  private static TwillRunnerService runnerService;
-  private static YarnConfiguration config;
-
-  @BeforeClass
-  public static final void init() throws IOException {
-    // Starts Zookeeper
-    zkServer = InMemoryZKServer.builder().build();
-    zkServer.startAndWait();
-
-    // Start YARN mini cluster
-    config = new YarnConfiguration(new Configuration());
-
-    if (YarnUtils.isHadoop20()) {
-      config.set("yarn.resourcemanager.scheduler.class",
-                 "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
-    } else {
-      config.set("yarn.resourcemanager.scheduler.class",
-                 "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
-      config.set("yarn.scheduler.capacity.resource-calculator",
-                 "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
-    }
-    config.set("yarn.minicluster.fixed.ports", "true");
-    config.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
-    config.set("yarn.nodemanager.vmem-check-enabled", "false");
-    config.set("yarn.scheduler.minimum-allocation-mb", "128");
-    config.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
-
-    cluster = new MiniYARNCluster("test-cluster", 1, 1, 1);
-    cluster.init(config);
-    cluster.start();
-
-    runnerService = createTwillRunnerService();
-    runnerService.startAndWait();
-  }
-
-  @AfterClass
-  public static final void finish() {
-    runnerService.stopAndWait();
-    cluster.stop();
-    zkServer.stopAndWait();
-  }
-
-  public static final TwillRunner getTwillRunner() {
-    return runnerService;
-  }
-
-  /**
-   * Creates an unstarted instance of {@link org.apache.twill.api.TwillRunnerService}.
-   */
-  public static final TwillRunnerService createTwillRunnerService() throws IOException {
-    return new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill",
-                                      new LocalLocationFactory(tmpFolder.newFolder()));
-  }
-
-  public static final <T> boolean waitForSize(Iterable<T> iterable, int count, int limit) throws InterruptedException {
-    int trial = 0;
-    int size = Iterables.size(iterable);
-    while (size != count && trial < limit) {
-      LOG.info("Waiting for {} size {} == {}", iterable, size, count);
-      TimeUnit.SECONDS.sleep(1);
-      trial++;
-      size = Iterables.size(iterable);
-    }
-    return trial < limit;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/resources/header.txt
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/header.txt b/yarn/src/test/resources/header.txt
deleted file mode 100644
index b6e25e6..0000000
--- a/yarn/src/test/resources/header.txt
+++ /dev/null
@@ -1 +0,0 @@
-Local file header

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/logback-test.xml b/yarn/src/test/resources/logback-test.xml
deleted file mode 100644
index 2615cb4..0000000
--- a/yarn/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!-- Default logback configuration for twill library -->
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="org.apache.twill" level="DEBUG" />
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/zookeeper/pom.xml b/zookeeper/pom.xml
deleted file mode 100644
index e76ee50..0000000
--- a/zookeeper/pom.xml
+++ /dev/null
@@ -1,67 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>twill-parent</artifactId>
-        <groupId>org.apache.twill</groupId>
-        <version>0.1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>twill-zookeeper</artifactId>
-    <name>Twill ZooKeeper client library</name>
-
-    <dependencies>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.google.code.findbugs</groupId>
-            <artifactId>jsr305</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
deleted file mode 100644
index 9e4f55f..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.zookeeper.NodeChildren;
-import com.google.common.base.Objects;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.List;
-
-/**
- *
- */
-final class BasicNodeChildren implements NodeChildren {
-
-  private final Stat stat;
-  private final List<String> children;
-
-  BasicNodeChildren(List<String> children, Stat stat) {
-    this.stat = stat;
-    this.children = children;
-  }
-
-  @Override
-  public Stat getStat() {
-    return stat;
-  }
-
-  @Override
-  public List<String> getChildren() {
-    return children;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || !(o instanceof NodeChildren)) {
-      return false;
-    }
-
-    NodeChildren that = (NodeChildren) o;
-    return stat.equals(that.getStat()) && children.equals(that.getChildren());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(children, stat);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
deleted file mode 100644
index 98a3a66..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.zookeeper.NodeData;
-import com.google.common.base.Objects;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.Arrays;
-
-/**
- * A straightforward implementation for {@link NodeData}.
- */
-final class BasicNodeData implements NodeData {
-
-  private final byte[] data;
-  private final Stat stat;
-
-  BasicNodeData(byte[] data, Stat stat) {
-    this.data = data;
-    this.stat = stat;
-  }
-
-  @Override
-  public Stat getStat() {
-    return stat;
-  }
-
-  @Override
-  public byte[] getData() {
-    return data;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || !(o instanceof NodeData)) {
-      return false;
-    }
-
-    BasicNodeData that = (BasicNodeData) o;
-
-    return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(data, stat);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
deleted file mode 100644
index c52fb08..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ /dev/null
@@ -1,525 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClientService;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The base implementation of {@link ZKClientService}.
- */
-public final class DefaultZKClientService implements ZKClientService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DefaultZKClientService.class);
-
-  private final String zkStr;
-  private final int sessionTimeout;
-  private final List<Watcher> connectionWatchers;
-  private final AtomicReference<ZooKeeper> zooKeeper;
-  private final Function<String, List<ACL>> aclMapper;
-  private final Service serviceDelegate;
-  private ExecutorService eventExecutor;
-
-  public DefaultZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher) {
-    this.zkStr = zkStr;
-    this.sessionTimeout = sessionTimeout;
-    this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
-    addConnectionWatcher(connectionWatcher);
-
-    this.zooKeeper = new AtomicReference<ZooKeeper>();
-
-    // TODO (terence): Add ACL
-    aclMapper = new Function<String, List<ACL>>() {
-      @Override
-      public List<ACL> apply(String input) {
-        return ZooDefs.Ids.OPEN_ACL_UNSAFE;
-      }
-    };
-    serviceDelegate = new ServiceDelegate();
-  }
-
-  @Override
-  public Long getSessionId() {
-    ZooKeeper zk = zooKeeper.get();
-    return zk == null ? null : zk.getSessionId();
-  }
-
-  @Override
-  public String getConnectString() {
-    return zkStr;
-  }
-
-  @Override
-  public void addConnectionWatcher(Watcher watcher) {
-    if (watcher != null) {
-      connectionWatchers.add(wrapWatcher(watcher));
-    }
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
-    return create(path, data, createMode, true);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data,
-                                        CreateMode createMode, boolean createParent) {
-    return doCreate(path, data, createMode, createParent, false);
-  }
-
-  private OperationFuture<String> doCreate(final String path,
-                                        @Nullable final byte[] data,
-                                        final CreateMode createMode,
-                                        final boolean createParent,
-                                        final boolean ignoreNodeExists) {
-    final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().create(path, data, aclMapper.apply(path), createMode, Callbacks.STRING, createFuture);
-    if (!createParent) {
-      return createFuture;
-    }
-
-    // If create parent is request, return a different future
-    final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
-    // Watch for changes in the original future
-    Futures.addCallback(createFuture, new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String path) {
-        // Propagate if creation was successful
-        result.set(path);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // See if the failure can be handled
-        if (updateFailureResult(t, result, path, ignoreNodeExists)) {
-          return;
-        }
-        // Create the parent node
-        String parentPath = getParent(path);
-        if (parentPath.isEmpty()) {
-          result.setException(t);
-          return;
-        }
-        // Watch for parent creation complete
-        Futures.addCallback(
-          doCreate(parentPath, null, CreateMode.PERSISTENT, createParent, true), new FutureCallback<String>() {
-          @Override
-          public void onSuccess(String parentPath) {
-            // Create the requested path again
-            Futures.addCallback(
-              doCreate(path, data, createMode, false, ignoreNodeExists), new FutureCallback<String>() {
-              @Override
-              public void onSuccess(String pathResult) {
-                result.set(pathResult);
-              }
-
-              @Override
-              public void onFailure(Throwable t) {
-                // handle the failure
-                updateFailureResult(t, result, path, ignoreNodeExists);
-              }
-            });
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            result.setException(t);
-          }
-        });
-      }
-
-      /**
-       * Updates the result future based on the given {@link Throwable}.
-       * @param t Cause of the failure
-       * @param result Future to be updated
-       * @param path Request path for the operation
-       * @return {@code true} if it is a failure, {@code false} otherwise.
-       */
-      private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
-                                          String path, boolean ignoreNodeExists) {
-        // Propagate if there is error
-        if (!(t instanceof KeeperException)) {
-          result.setException(t);
-          return true;
-        }
-        KeeperException.Code code = ((KeeperException) t).code();
-        // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
-        if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
-          // The requested path could be used because it only applies to non-sequential node
-          result.set(path);
-          return false;
-        }
-        if (code != KeeperException.Code.NONODE) {
-          result.setException(t);
-          return true;
-        }
-        return false;
-      }
-
-      /**
-       * Gets the parent of the given path.
-       * @param path Path for computing its parent
-       * @return Parent of the given path, or empty string if the given path is the root path already.
-       */
-      private String getParent(String path) {
-        String parentPath = path.substring(0, path.lastIndexOf('/'));
-        return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
-      }
-    });
-
-    return result;
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path) {
-    return exists(path, null);
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path, Watcher watcher) {
-    SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
-    return result;
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return getChildren(path, null);
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
-    SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
-    return result;
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return getData(path, null);
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path, Watcher watcher) {
-    SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
-
-    return result;
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return setData(path, data, -1);
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
-    SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
-    getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
-    return result;
-  }
-
-  @Override
-  public OperationFuture<String> delete(String path) {
-    return delete(path, -1);
-  }
-
-  @Override
-  public OperationFuture<String> delete(String deletePath, int version) {
-    SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
-    getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
-    return result;
-  }
-
-  @Override
-  public Supplier<ZooKeeper> getZooKeeperSupplier() {
-    return new Supplier<ZooKeeper>() {
-      @Override
-      public ZooKeeper get() {
-        return getZooKeeper();
-      }
-    };
-  }
-
-  @Override
-  public ListenableFuture<State> start() {
-    return serviceDelegate.start();
-  }
-
-  @Override
-  public State startAndWait() {
-    return serviceDelegate.startAndWait();
-  }
-
-  @Override
-  public boolean isRunning() {
-    return serviceDelegate.isRunning();
-  }
-
-  @Override
-  public State state() {
-    return serviceDelegate.state();
-  }
-
-  @Override
-  public ListenableFuture<State> stop() {
-    return serviceDelegate.stop();
-  }
-
-  @Override
-  public State stopAndWait() {
-    return serviceDelegate.stopAndWait();
-  }
-
-  @Override
-  public void addListener(Listener listener, Executor executor) {
-    serviceDelegate.addListener(listener, executor);
-  }
-
-  /**
-   * @return Current {@link ZooKeeper} client.
-   */
-  private ZooKeeper getZooKeeper() {
-    ZooKeeper zk = zooKeeper.get();
-    Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
-    return zk;
-  }
-
-  /**
-   * Wraps the given watcher to be called from the event executor.
-   * @param watcher Watcher to be wrapped
-   * @return The wrapped Watcher
-   */
-  private Watcher wrapWatcher(final Watcher watcher) {
-    if (watcher == null) {
-      return null;
-    }
-    return new Watcher() {
-      @Override
-      public void process(final WatchedEvent event) {
-        eventExecutor.execute(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              watcher.process(event);
-            } catch (Throwable t) {
-              LOG.error("Watcher throws exception.", t);
-            }
-          }
-        });
-      }
-    };
-  }
-
-  private final class ServiceDelegate extends AbstractService implements Watcher {
-
-    @Override
-    protected void doStart() {
-      // A single thread executor
-      eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
-                                             Threads.createDaemonThreadFactory("zk-client-EventThread")) {
-        @Override
-        protected void terminated() {
-          super.terminated();
-          notifyStopped();
-        }
-      };
-
-      try {
-        zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, this));
-      } catch (IOException e) {
-        notifyFailed(e);
-      }
-    }
-
-    @Override
-    protected void doStop() {
-      ZooKeeper zk = zooKeeper.getAndSet(null);
-      if (zk != null) {
-        try {
-          zk.close();
-        } catch (InterruptedException e) {
-          notifyFailed(e);
-        } finally {
-          eventExecutor.shutdown();
-        }
-      }
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-      try {
-        if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
-          LOG.info("Connected to ZooKeeper: " + zkStr);
-          notifyStarted();
-          return;
-        }
-        if (event.getState() == Event.KeeperState.Expired) {
-          LOG.info("ZooKeeper session expired: " + zkStr);
-
-          // When connection expired, simply reconnect again
-          Thread t = new Thread(new Runnable() {
-            @Override
-            public void run() {
-              try {
-                zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, ServiceDelegate.this));
-              } catch (IOException e) {
-                zooKeeper.set(null);
-                notifyFailed(e);
-              }
-            }
-          }, "zk-reconnect");
-          t.setDaemon(true);
-          t.start();
-        }
-      } finally {
-        if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) {
-          for (Watcher connectionWatcher : connectionWatchers) {
-            connectionWatcher.process(event);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Collection of generic callbacks that simply reflect results into OperationFuture.
-   */
-  private static final class Callbacks {
-    static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, String name) {
-        SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set((name == null || name.isEmpty()) ? path : name);
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, Stat stat) {
-        SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(stat);
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    /**
-     * A stat callback that treats NONODE as success.
-     */
-    static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, Stat stat) {
-        SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
-          result.set(stat);
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(new BasicNodeChildren(children, stat));
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-        SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(new BasicNodeData(data, stat));
-          return;
-        }
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-
-    static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public void processResult(int rc, String path, Object ctx) {
-        SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
-        KeeperException.Code code = KeeperException.Code.get(rc);
-        if (code == KeeperException.Code.OK) {
-          result.set(result.getRequestPath());
-          return;
-        }
-        // Otherwise, it is an error
-        result.setException(KeeperException.create(code, result.getRequestPath()));
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
deleted file mode 100644
index 65ceadb..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.ForwardingZKClient;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.RetryStrategy;
-import org.apache.twill.zookeeper.RetryStrategy.OperationType;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A {@link ZKClient} that will invoke {@link RetryStrategy} on operation failure.
- * This {@link ZKClient} works by delegating calls to another {@link ZKClient}
- * and listen for the result. If the result is a failure, and is
- * {@link RetryUtils#canRetry(org.apache.zookeeper.KeeperException.Code) retryable}, the given {@link RetryStrategy}
- * will be called to determine the next retry time, or give up, depending on the value returned by the strategy.
- */
-public final class FailureRetryZKClient extends ForwardingZKClient {
-
-  private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor(
-                                                                Threads.createDaemonThreadFactory("retry-zkclient"));
-  private final RetryStrategy retryStrategy;
-
-  public FailureRetryZKClient(ZKClient delegate, RetryStrategy retryStrategy) {
-    super(delegate);
-    this.retryStrategy = retryStrategy;
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
-    return create(path, data, createMode, true);
-  }
-
-  @Override
-  public OperationFuture<String> create(final String path, final byte[] data,
-                                        final CreateMode createMode, final boolean createParent) {
-
-    // No retry for any SEQUENTIAL node, as some algorithms depends on only one sequential node being created.
-    if (createMode == CreateMode.PERSISTENT_SEQUENTIAL || createMode == CreateMode.EPHEMERAL_SEQUENTIAL) {
-      return super.create(path, data, createMode, createParent);
-    }
-
-    final SettableOperationFuture<String> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
-    Futures.addCallback(super.create(path, data, createMode, createParent),
-                        new OperationFutureCallback<String>(OperationType.CREATE, System.currentTimeMillis(),
-                                                            path, result, new Supplier<OperationFuture<String>>() {
-                          @Override
-                          public OperationFuture<String> get() {
-                            return FailureRetryZKClient.super.create(path, data, createMode, createParent);
-                          }
-                        }));
-    return result;
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path) {
-    return exists(path, null);
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(final String path, final Watcher watcher) {
-    final SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
-    Futures.addCallback(super.exists(path, watcher),
-                        new OperationFutureCallback<Stat>(OperationType.EXISTS, System.currentTimeMillis(),
-                                                          path, result, new Supplier<OperationFuture<Stat>>() {
-                          @Override
-                          public OperationFuture<Stat> get() {
-                            return FailureRetryZKClient.super.exists(path, watcher);
-                          }
-                        }));
-    return result;
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return getChildren(path, null);
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(final String path, final Watcher watcher) {
-    final SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path,
-                                                                                        Threads.SAME_THREAD_EXECUTOR);
-    Futures.addCallback(super.getChildren(path, watcher),
-                        new OperationFutureCallback<NodeChildren>(OperationType.GET_CHILDREN,
-                                                                  System.currentTimeMillis(), path, result,
-                                                                  new Supplier<OperationFuture<NodeChildren>>() {
-                          @Override
-                          public OperationFuture<NodeChildren> get() {
-                            return FailureRetryZKClient.super.getChildren(path, watcher);
-                          }
-                        }));
-    return result;
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return getData(path, null);
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(final String path, final Watcher watcher) {
-    final SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
-    Futures.addCallback(super.getData(path, watcher),
-                        new OperationFutureCallback<NodeData>(OperationType.GET_DATA, System.currentTimeMillis(),
-                                                              path, result, new Supplier<OperationFuture<NodeData>>() {
-                          @Override
-                          public OperationFuture<NodeData> get() {
-                            return FailureRetryZKClient.super.getData(path, watcher);
-                          }
-                        }));
-    return result;
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return setData(path, data, -1);
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(final String dataPath, final byte[] data, final int version) {
-    final SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, Threads.SAME_THREAD_EXECUTOR);
-    Futures.addCallback(super.setData(dataPath, data, version),
-                        new OperationFutureCallback<Stat>(OperationType.SET_DATA, System.currentTimeMillis(),
-                                                          dataPath, result, new Supplier<OperationFuture<Stat>>() {
-                          @Override
-                          public OperationFuture<Stat> get() {
-                            return FailureRetryZKClient.super.setData(dataPath, data, version);
-                          }
-                        }));
-    return result;
-  }
-
-  @Override
-  public OperationFuture<String> delete(String path) {
-    return delete(path, -1);
-  }
-
-  @Override
-  public OperationFuture<String> delete(final String deletePath, final int version) {
-    final SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath,
-                                                                                  Threads.SAME_THREAD_EXECUTOR);
-    Futures.addCallback(super.delete(deletePath, version),
-                        new OperationFutureCallback<String>(OperationType.DELETE, System.currentTimeMillis(),
-                                                            deletePath, result, new Supplier<OperationFuture<String>>
-                          () {
-                          @Override
-                          public OperationFuture<String> get() {
-                            return FailureRetryZKClient.super.delete(deletePath, version);
-                          }
-                        }));
-    return result;
-  }
-
-  /**
-   * Callback to watch for operation result and trigger retry if necessary.
-   * @param <V> Type of operation result.
-   */
-  private final class OperationFutureCallback<V> implements FutureCallback<V> {
-
-    private final OperationType type;
-    private final long startTime;
-    private final String path;
-    private final SettableOperationFuture<V> result;
-    private final Supplier<OperationFuture<V>> retryAction;
-    private final AtomicInteger failureCount;
-
-    private OperationFutureCallback(OperationType type, long startTime, String path,
-                                    SettableOperationFuture<V> result, Supplier<OperationFuture<V>> retryAction) {
-      this.type = type;
-      this.startTime = startTime;
-      this.path = path;
-      this.result = result;
-      this.retryAction = retryAction;
-      this.failureCount = new AtomicInteger(0);
-    }
-
-    @Override
-    public void onSuccess(V result) {
-      this.result.set(result);
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      if (!doRetry(t)) {
-        result.setException(t);
-      }
-    }
-
-    private boolean doRetry(Throwable t) {
-      if (!RetryUtils.canRetry(t)) {
-        return false;
-      }
-
-      // Determine the relay delay
-      long nextRetry = retryStrategy.nextRetry(failureCount.incrementAndGet(), startTime, type, path);
-      if (nextRetry < 0) {
-        return false;
-      }
-
-      // Schedule the retry.
-      SCHEDULER.schedule(new Runnable() {
-        @Override
-        public void run() {
-          Futures.addCallback(retryAction.get(), OperationFutureCallback.this);
-        }
-      }, nextRetry, TimeUnit.MILLISECONDS);
-
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
deleted file mode 100644
index c4eed59..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public final class InMemoryZKServer implements Service {
-
-  private static final Logger LOG = LoggerFactory.getLogger(InMemoryZKServer.class);
-
-  private final File dataDir;
-  private final int tickTime;
-  private final boolean autoClean;
-  private final int port;
-  private final Service delegateService = new AbstractIdleService() {
-    @Override
-    protected void startUp() throws Exception {
-      ZooKeeperServer zkServer = new ZooKeeperServer();
-      FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
-      zkServer.setTxnLogFactory(ftxn);
-      zkServer.setTickTime(tickTime);
-
-      factory = ServerCnxnFactory.createFactory();
-      factory.configure(getAddress(port), -1);
-      factory.startup(zkServer);
-
-      LOG.info("In memory ZK started: " + getConnectionStr());
-    }
-
-    @Override
-    protected void shutDown() throws Exception {
-      try {
-        factory.shutdown();
-      } finally {
-        if (autoClean) {
-          cleanDir(dataDir);
-        }
-      }
-    }
-  };
-
-  private ServerCnxnFactory factory;
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  private InMemoryZKServer(File dataDir, int tickTime, boolean autoClean, int port) {
-    if (dataDir == null) {
-      dataDir = Files.createTempDir();
-      autoClean = true;
-    } else {
-      Preconditions.checkArgument(dataDir.isDirectory() || dataDir.mkdirs() || dataDir.isDirectory());
-    }
-
-    this.dataDir = dataDir;
-    this.tickTime = tickTime;
-    this.autoClean = autoClean;
-    this.port = port;
-  }
-
-  public String getConnectionStr() {
-    InetSocketAddress addr = factory.getLocalAddress();
-    return String.format("%s:%d", addr.getHostName(), addr.getPort());
-  }
-
-  public InetSocketAddress getLocalAddress() {
-    return factory.getLocalAddress();
-  }
-
-  private InetSocketAddress getAddress(int port) {
-    try {
-//      return new InetSocketAddress(InetAddress.getByAddress(new byte[] {127, 0, 0, 1}), port < 0 ? 0 : port);
-      return new InetSocketAddress(InetAddress.getLocalHost(), port < 0 ? 0 : port);
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private void cleanDir(File dir) {
-    File[] files = dir.listFiles();
-    if (files == null) {
-      return;
-    }
-    for (File file : files) {
-      if (file.isDirectory()) {
-        cleanDir(file);
-      }
-      file.delete();
-    }
-  }
-
-  @Override
-  public ListenableFuture<State> start() {
-    return delegateService.start();
-  }
-
-  @Override
-  public State startAndWait() {
-    return delegateService.startAndWait();
-  }
-
-  @Override
-  public boolean isRunning() {
-    return delegateService.isRunning();
-  }
-
-  @Override
-  public State state() {
-    return delegateService.state();
-  }
-
-  @Override
-  public ListenableFuture<State> stop() {
-    return delegateService.stop();
-  }
-
-  @Override
-  public State stopAndWait() {
-    return delegateService.stopAndWait();
-  }
-
-  @Override
-  public void addListener(Listener listener, Executor executor) {
-    delegateService.addListener(listener, executor);
-  }
-
-  /**
-   * Builder for creating instance of {@link InMemoryZKServer}.
-   */
-  public static final class Builder {
-    private File dataDir;
-    private boolean autoCleanDataDir = false;
-    private int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
-    private int port = -1;
-
-    public Builder setDataDir(File dataDir) {
-      this.dataDir = dataDir;
-      return this;
-    }
-
-    public Builder setAutoCleanDataDir(boolean auto) {
-      this.autoCleanDataDir = auto;
-      return this;
-    }
-
-    public Builder setTickTime(int tickTime) {
-      this.tickTime = tickTime;
-      return this;
-    }
-
-    public Builder setPort(int port) {
-      this.port = port;
-      return this;
-    }
-
-    public InMemoryZKServer build() {
-      return new InMemoryZKServer(dataDir, tickTime, autoCleanDataDir, port);
-    }
-
-    private Builder() {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
deleted file mode 100644
index bc01f08..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import com.google.common.base.Preconditions;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility class for killing ZK client to simulate failures during testing.
- */
-public final class KillZKSession {
-
-  /**
-   * Utility classes should have a public constructor or a default constructor
-   * hence made it private.
-   */
-  private KillZKSession() {}
-
-  /**
-   * Kills a Zookeeper client to simulate failure scenarious during testing.
-   * Callee will provide the amount of time to wait before it's considered failure
-   * to kill a client.
-   *
-   * @param client that needs to be killed.
-   * @param connectionString of Quorum
-   * @param maxMs time in millisecond specifying the max time to kill a client.
-   * @throws IOException When there is IO error
-   * @throws InterruptedException When call has been interrupted.
-   */
-  public static void kill(ZooKeeper client, String connectionString,
-                          int maxMs) throws IOException, InterruptedException {
-    final CountDownLatch latch = new CountDownLatch(1);
-    ZooKeeper zk = new ZooKeeper(connectionString, maxMs, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getState() == Event.KeeperState.SyncConnected) {
-          latch.countDown();
-        }
-      }
-    }, client.getSessionId(), client.getSessionPasswd());
-
-    try {
-      Preconditions.checkState(latch.await(maxMs, TimeUnit.MILLISECONDS), "Fail to kill ZK connection.");
-    } finally {
-      zk.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
deleted file mode 100644
index 1a82e4b..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.ForwardingZKClient;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link ZKClient} that namespace every paths.
- */
-public final class NamespaceZKClient extends ForwardingZKClient {
-  // This class extends from ForwardingZKClient but overrides every method is for letting the
-  // ZKClientServices delegate logic works.
-
-  private final String namespace;
-  private final ZKClient delegate;
-  private final String connectString;
-
-  public NamespaceZKClient(ZKClient delegate, String namespace) {
-    super(delegate);
-    this.namespace = namespace;
-    this.delegate = delegate;
-    this.connectString = delegate.getConnectString() + namespace;
-  }
-
-  @Override
-  public Long getSessionId() {
-    return delegate.getSessionId();
-  }
-
-  @Override
-  public String getConnectString() {
-    return connectString;
-  }
-
-  @Override
-  public void addConnectionWatcher(Watcher watcher) {
-    delegate.addConnectionWatcher(watcher);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
-    return relayPath(delegate.create(namespace + path, data, createMode), this.<String>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
-                                        boolean createParent) {
-    return relayPath(delegate.create(namespace + path, data, createMode, createParent),
-                     this.<String>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path) {
-    return relayFuture(delegate.exists(namespace + path), this.<Stat>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
-    return relayFuture(delegate.exists(namespace + path, watcher), this.<Stat>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return relayFuture(delegate.getChildren(namespace + path), this.<NodeChildren>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
-    return relayFuture(delegate.getChildren(namespace + path, watcher), this.<NodeChildren>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return relayFuture(delegate.getData(namespace + path), this.<NodeData>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
-    return relayFuture(delegate.getData(namespace + path, watcher), this.<NodeData>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return relayFuture(delegate.setData(namespace + path, data), this.<Stat>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
-    return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
-  }
-
-  @Override
-  public OperationFuture<String> delete(String path) {
-    return relayPath(delegate.delete(namespace + path), this.<String>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<String> delete(String deletePath, int version) {
-    return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
-  }
-
-  private <V> SettableOperationFuture<V> createFuture(String path) {
-    return SettableOperationFuture.create(namespace + path, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-  private <V> OperationFuture<V> relayFuture(final OperationFuture<V> from, final SettableOperationFuture<V> to) {
-    Futures.addCallback(from, new FutureCallback<V>() {
-      @Override
-      public void onSuccess(V result) {
-        to.set(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        to.setException(t);
-      }
-    });
-    return to;
-  }
-
-  private OperationFuture<String> relayPath(final OperationFuture<String> from,
-                                            final SettableOperationFuture<String> to) {
-    from.addListener(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          String path = from.get();
-          to.set(path.substring(namespace.length()));
-        } catch (Exception e) {
-          to.setException(e.getCause());
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-    return to;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
deleted file mode 100644
index fb42491..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Utility class for help determining operation retry condition.
- */
-final class RetryUtils {
-
-  /**
-   * Tells if a given operation error code can be retried or not.
-   * @param code The error code of the operation.
-   * @return {@code true} if the operation can be retried.
-   */
-  public static boolean canRetry(KeeperException.Code code) {
-    return (code == KeeperException.Code.CONNECTIONLOSS
-          || code == KeeperException.Code.OPERATIONTIMEOUT
-          || code == KeeperException.Code.SESSIONEXPIRED
-          || code == KeeperException.Code.SESSIONMOVED);
-  }
-
-  /**
-   * Tells if a given operation exception can be retried or not.
-   * @param t The exception raised by an operation.
-   * @return {@code true} if the operation can be retried.
-   */
-  public static boolean canRetry(Throwable t) {
-    return t instanceof KeeperException && canRetry(((KeeperException) t).code());
-  }
-
-  private RetryUtils() {
-  }
-}