You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:18 UTC

[02/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java
deleted file mode 100644
index 181ca2b..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicMarkableReference;
-
-/**
- * A wrapper for {@link Watcher} that will re-set the watch automatically until it is successful.
- */
-final class RewatchOnExpireWatcher implements Watcher {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RewatchOnExpireWatcher.class);
-
-  enum ActionType {
-    EXISTS,
-    CHILDREN,
-    DATA
-  }
-
-  private final ZKClient client;
-  private final ActionType actionType;
-  private final String path;
-  private final Watcher delegate;
-  private final AtomicMarkableReference<Object> lastResult;
-
-  RewatchOnExpireWatcher(ZKClient client, ActionType actionType, String path, Watcher delegate) {
-    this.client = client;
-    this.actionType = actionType;
-    this.path = path;
-    this.delegate = delegate;
-    this.lastResult = new AtomicMarkableReference<Object>(null, false);
-  }
-
-  /**
-   * Sets the result from the operation that causes this watcher to be set.
-   */
-  void setLastResult(Object result) {
-    lastResult.compareAndSet(null, result, false, true);
-  }
-
-  @Override
-  public void process(WatchedEvent event) {
-    if (delegate != null && event.getType() != Event.EventType.None) {
-      try {
-        delegate.process(event);
-      } catch (Throwable t) {
-        LOG.error("Watcher throws exception.", t);
-      }
-    }
-
-    if (event.getState() != Event.KeeperState.Expired) {
-      return;
-    }
-    switch (actionType) {
-      case EXISTS:
-        exists();
-        break;
-      case CHILDREN:
-        children();
-        break;
-      case DATA:
-        data();
-        break;
-    }
-  }
-
-  private void exists() {
-    Futures.addCallback(client.exists(path, this), new FutureCallback<Stat>() {
-      @Override
-      public void onSuccess(Stat stat) {
-        // Since we know all callbacks and watcher are triggered from single event thread, there is no race condition.
-        Object oldResult = lastResult.getReference();
-        lastResult.compareAndSet(oldResult, null, true, false);
-
-        if (stat != oldResult && (stat == null || !stat.equals(oldResult))) {
-          if (stat == null) {
-            // previous stat is not null, means node deleted
-            process(new WatchedEvent(Event.EventType.NodeDeleted, Event.KeeperState.SyncConnected, path));
-          } else if (oldResult == null) {
-            // previous stat is null, means node created
-            process(new WatchedEvent(Event.EventType.NodeCreated, Event.KeeperState.SyncConnected, path));
-          } else {
-            // Otherwise, something changed on the node
-            process(new WatchedEvent(Event.EventType.NodeDataChanged, Event.KeeperState.SyncConnected, path));
-          }
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        if (RetryUtils.canRetry(t)) {
-          exists();
-        } else {
-          lastResult.set(null, false);
-          LOG.error("Fail to re-set watch on exists for path " + path, t);
-        }
-      }
-    });
-  }
-
-  private void children() {
-    Futures.addCallback(client.getChildren(path, this), new FutureCallback<NodeChildren>() {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        Object oldResult = lastResult.getReference();
-        lastResult.compareAndSet(oldResult, null, true, false);
-
-        if (result.equals(oldResult)) {
-          return;
-        }
-
-        if (!(oldResult instanceof NodeChildren)) {
-          // Something very wrong
-          LOG.error("The same watcher has been used for different event type.");
-          return;
-        }
-
-        NodeChildren oldNodeChildren = (NodeChildren) oldResult;
-        if (!result.getChildren().equals(oldNodeChildren.getChildren())) {
-          process(new WatchedEvent(Event.EventType.NodeChildrenChanged, Event.KeeperState.SyncConnected, path));
-        } else {
-          process(new WatchedEvent(Event.EventType.NodeDataChanged, Event.KeeperState.SyncConnected, path));
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        if (RetryUtils.canRetry(t)) {
-          children();
-          return;
-        }
-
-        lastResult.set(null, false);
-        if (t instanceof KeeperException) {
-          KeeperException.Code code = ((KeeperException) t).code();
-          if (code == KeeperException.Code.NONODE) {
-            // Node deleted
-            process(new WatchedEvent(Event.EventType.NodeDeleted, Event.KeeperState.SyncConnected, path));
-            return;
-          }
-        }
-        LOG.error("Fail to re-set watch on getChildren for path " + path, t);
-      }
-    });
-  }
-
-  private void data() {
-    Futures.addCallback(client.getData(path, this), new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        Object oldResult = lastResult.getReference();
-        lastResult.compareAndSet(oldResult, null, true, false);
-
-        if (!result.equals(oldResult)) {
-          // Whenever something changed, treated it as data changed.
-          process(new WatchedEvent(Event.EventType.NodeDataChanged, Event.KeeperState.SyncConnected, path));
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        if (RetryUtils.canRetry(t)) {
-          data();
-          return;
-        }
-
-        lastResult.set(null, false);
-        if (t instanceof KeeperException) {
-          KeeperException.Code code = ((KeeperException) t).code();
-          if (code == KeeperException.Code.NONODE) {
-            // Node deleted
-            process(new WatchedEvent(Event.EventType.NodeDeleted, Event.KeeperState.SyncConnected, path));
-            return;
-          }
-        }
-        LOG.error("Fail to re-set watch on getData for path " + path, t);
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
deleted file mode 100644
index 402c153..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.internal.zookeeper.RewatchOnExpireWatcher.ActionType;
-import org.apache.twill.zookeeper.ForwardingZKClient;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * A {@link ZKClient} that will rewatch automatically when session expired and reconnect.
- * The rewatch logic is mainly done in {@link RewatchOnExpireWatcher}.
- */
-public final class RewatchOnExpireZKClient extends ForwardingZKClient {
-
-  public RewatchOnExpireZKClient(ZKClient delegate) {
-    super(delegate);
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path, Watcher watcher) {
-    final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.EXISTS, path, watcher);
-    OperationFuture<Stat> result = super.exists(path, wrappedWatcher);
-    Futures.addCallback(result, new FutureCallback<Stat>() {
-      @Override
-      public void onSuccess(Stat result) {
-        wrappedWatcher.setLastResult(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // No-op
-      }
-    });
-    return result;
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
-    final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.CHILDREN, path, watcher);
-    OperationFuture<NodeChildren> result = super.getChildren(path, wrappedWatcher);
-    Futures.addCallback(result, new FutureCallback<NodeChildren>() {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        wrappedWatcher.setLastResult(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // No-op
-      }
-    });
-    return result;
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path, Watcher watcher) {
-    final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.DATA, path, watcher);
-    OperationFuture<NodeData> result = super.getData(path, wrappedWatcher);
-    Futures.addCallback(result, new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        wrappedWatcher.setLastResult(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // No-op
-      }
-    });
-    return result;
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
deleted file mode 100644
index 7544e56..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.zookeeper.OperationFuture;
-import com.google.common.util.concurrent.AbstractFuture;
-
-import javax.annotation.Nullable;
-import java.util.concurrent.Executor;
-
-/**
- * An implementation for {@link OperationFuture} that allows setting result directly.
- * Also, all listener callback will be fired from the given executor.
- */
-public final class SettableOperationFuture<V> extends AbstractFuture<V> implements OperationFuture<V> {
-
-  private final String requestPath;
-  private final Executor executor;
-
-  public static <V> SettableOperationFuture<V> create(String path, Executor executor) {
-    return new SettableOperationFuture<V>(path, executor);
-  }
-
-  private SettableOperationFuture(String requestPath, Executor executor) {
-    this.requestPath = requestPath;
-    this.executor = executor;
-  }
-
-  @Override
-  public String getRequestPath() {
-    return requestPath;
-  }
-
-  @Override
-  public void addListener(final Runnable listener, final Executor exec) {
-    super.addListener(new Runnable() {
-      @Override
-      public void run() {
-        exec.execute(listener);
-      }
-    }, executor);
-  }
-
-  @Override
-  public boolean setException(Throwable throwable) {
-    return super.setException(throwable);
-  }
-
-  @Override
-  public boolean set(@Nullable V value) {
-    return super.set(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/package-info.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/package-info.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/package-info.java
deleted file mode 100644
index d2afa11..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal classes for zookeeper.
- */
-package org.apache.twill.internal.zookeeper;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
deleted file mode 100644
index 3f3003d..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import javax.annotation.Nullable;
-
-/**
- *
- */
-public abstract class ForwardingZKClient implements ZKClient {
-
-  private final ZKClient delegate;
-
-  protected ForwardingZKClient(ZKClient delegate) {
-    this.delegate = delegate;
-  }
-
-  public final ZKClient getDelegate() {
-    return delegate;
-  }
-
-  @Override
-  public Long getSessionId() {
-    return delegate.getSessionId();
-  }
-
-  @Override
-  public String getConnectString() {
-    return delegate.getConnectString();
-  }
-
-  @Override
-  public void addConnectionWatcher(Watcher watcher) {
-    delegate.addConnectionWatcher(watcher);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
-    return create(path, data, createMode, true);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
-                                        boolean createParent) {
-    return delegate.create(path, data, createMode, createParent);
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path) {
-    return exists(path, null);
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
-    return delegate.exists(path, watcher);
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return getChildren(path, null);
-  }
-
-  @Override
-  public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
-    return delegate.getChildren(path, watcher);
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return getData(path, null);
-  }
-
-  @Override
-  public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
-    return delegate.getData(path, watcher);
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return setData(path, data, -1);
-  }
-
-  @Override
-  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
-    return delegate.setData(dataPath, data, version);
-  }
-
-  @Override
-  public OperationFuture<String> delete(String path) {
-    return delete(path, -1);
-  }
-
-  @Override
-  public OperationFuture<String> delete(String deletePath, int version) {
-    return delegate.delete(deletePath, version);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java
deleted file mode 100644
index 10391b2..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.zookeeper.ZooKeeper;
-
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public abstract class ForwardingZKClientService extends ForwardingZKClient implements ZKClientService {
-
-  private final ZKClientService delegate;
-
-  protected ForwardingZKClientService(ZKClientService delegate) {
-    super(delegate);
-    this.delegate = delegate;
-  }
-
-  @Override
-  public Supplier<ZooKeeper> getZooKeeperSupplier() {
-    return delegate.getZooKeeperSupplier();
-  }
-
-  @Override
-  public ListenableFuture<State> start() {
-    return delegate.start();
-  }
-
-  @Override
-  public State startAndWait() {
-    return Futures.getUnchecked(start());
-  }
-
-  @Override
-  public boolean isRunning() {
-    return delegate.isRunning();
-  }
-
-  @Override
-  public State state() {
-    return delegate.state();
-  }
-
-  @Override
-  public ListenableFuture<State> stop() {
-    return delegate.stop();
-  }
-
-  @Override
-  public State stopAndWait() {
-    return Futures.getUnchecked(stop());
-  }
-
-  @Override
-  public void addListener(Listener listener, Executor executor) {
-    delegate.addListener(listener, executor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/NodeChildren.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/NodeChildren.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/NodeChildren.java
deleted file mode 100644
index b432c01..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/NodeChildren.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.zookeeper.data.Stat;
-
-import java.util.List;
-
-/**
- * Represents result of call to {@link ZKClientService#getChildren(String, org.apache.zookeeper.Watcher)} method.
- */
-public interface NodeChildren {
-
-  /**
-   * @return The {@link Stat} of the node.
-   */
-  Stat getStat();
-
-  /**
-   * @return List of children node names.
-   */
-  List<String> getChildren();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
deleted file mode 100644
index ac15957..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.zookeeper.data.Stat;
-
-import javax.annotation.Nullable;
-
-/**
- * Represents result of call to {@link ZKClientService#getData(String, org.apache.zookeeper.Watcher)}.
- */
-public interface NodeData {
-
-  /**
-   * @return The {@link Stat} of the node.
-   */
-  Stat getStat();
-
-  /**
-   * @return Data stored in the node, or {@code null} if there is no data.
-   */
-  @Nullable
-  byte[] getData();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/OperationFuture.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/OperationFuture.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/OperationFuture.java
deleted file mode 100644
index fafaa7a..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/OperationFuture.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * A {@link ListenableFuture} that also provides the requested path for a operation.
- *
- * @param <V> The result type returned by this Future's {@link #get()} method.
- */
-public interface OperationFuture<V> extends ListenableFuture<V> {
-
-  /**
-   * @return The path being requested for the ZooKeeper operation.
-   */
-  String getRequestPath();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategies.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategies.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategies.java
deleted file mode 100644
index 56474b7..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategies.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import com.google.common.base.Preconditions;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Factory for creating common {@link RetryStrategy} implementation.
- */
-public final class RetryStrategies {
-
-  /**
-   * @return A {@link RetryStrategy} that doesn't do any retry.
-   */
-  public static RetryStrategy noRetry() {
-    return new RetryStrategy() {
-      @Override
-      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
-        return -1;
-      }
-    };
-  }
-
-  /**
-   * Creates a {@link RetryStrategy} that retries maximum given number of times, with the actual
-   * delay behavior delegated to another {@link RetryStrategy}.
-   * @param limit Maximum number of retries allowed.
-   * @param strategy When failure count is less than or equal to the limit, this strategy will be called.
-   * @return A {@link RetryStrategy}.
-   */
-  public static RetryStrategy limit(final int limit, final RetryStrategy strategy) {
-    Preconditions.checkArgument(limit >= 0, "limit must be >= 0");
-    return new RetryStrategy() {
-      @Override
-      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
-        return (failureCount <= limit) ? strategy.nextRetry(failureCount, startTime, type, path) : -1L;
-      }
-    };
-  }
-
-  /**
-   * Creates a {@link RetryStrategy} that imposes a fix delay between each retries.
-   * @param delay delay time
-   * @param delayUnit {@link TimeUnit} for the delay.
-   * @return A {@link RetryStrategy}.
-   */
-  public static RetryStrategy fixDelay(final long delay, final TimeUnit delayUnit) {
-    Preconditions.checkArgument(delay >= 0, "delay must be >= 0");
-    return new RetryStrategy() {
-      @Override
-      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
-        return TimeUnit.MILLISECONDS.convert(delay, delayUnit);
-      }
-    };
-  }
-
-  /**
-   * Creates a {@link RetryStrategy} that will increase delay exponentially between each retries.
-   * @param baseDelay delay to start with.
-   * @param maxDelay cap of the delay.
-   * @param delayUnit {@link TimeUnit} for the delays.
-   * @return A {@link RetryStrategy}.
-   */
-  public static RetryStrategy exponentialDelay(final long baseDelay, final long maxDelay, final TimeUnit delayUnit) {
-    Preconditions.checkArgument(baseDelay >= 0, "base delay must be >= 0");
-    Preconditions.checkArgument(maxDelay >= 0, "max delay must be >= 0");
-    return new RetryStrategy() {
-      @Override
-      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
-        long power = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1));
-        long delay = Math.min(baseDelay * power, maxDelay);
-        delay = delay < 0 ? maxDelay : delay;
-        return TimeUnit.MILLISECONDS.convert(delay, delayUnit);
-      }
-    };
-  }
-
-  /**
-   * Creates a {@link RetryStrategy} that will retry until maximum amount of time has been passed since the request,
-   * with the actual delay behavior delegated to another {@link RetryStrategy}.
-   * @param maxElapseTime Maximum amount of time until giving up retry.
-   * @param timeUnit {@link TimeUnit} for the max elapse time.
-   * @param strategy When time elapsed is less than or equal to the limit, this strategy will be called.
-   * @return A {@link RetryStrategy}.
-   */
-  public static RetryStrategy timeLimit(long maxElapseTime, TimeUnit timeUnit, final RetryStrategy strategy) {
-    Preconditions.checkArgument(maxElapseTime >= 0, "max elapse time must be >= 0");
-    final long maxElapseMs = TimeUnit.MILLISECONDS.convert(maxElapseTime, timeUnit);
-    return new RetryStrategy() {
-      @Override
-      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
-        long elapseTime = System.currentTimeMillis() - startTime;
-        return elapseTime <= maxElapseMs ? strategy.nextRetry(failureCount, startTime, type, path) : -1L;
-      }
-    };
-  }
-
-  private RetryStrategies() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
deleted file mode 100644
index 3301e8a..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-/**
- * Provides strategy to use for operation retries.
- */
-public interface RetryStrategy {
-
-  /**
-   * Defines ZooKeeper operation type that triggers retry.
-   */
-  enum OperationType {
-    CREATE,
-    EXISTS,
-    GET_CHILDREN,
-    GET_DATA,
-    SET_DATA,
-    DELETE
-  }
-
-  /**
-   * Returns the number of milliseconds to wait before retrying the operation.
-   *
-   * @param failureCount Number of times that the request has been failed.
-   * @param startTime Timestamp in milliseconds that the request starts.
-   * @param type Type of operation tried to perform.
-   * @param path The path that the operation is acting on.
-   * @return Number of milliseconds to wait before retrying the operation. Returning {@code 0} means
-   *         retry it immediately, while negative means abort the operation.
-   */
-  long nextRetry(int failureCount, long startTime, OperationType type, String path);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
deleted file mode 100644
index d60182e..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import javax.annotation.Nullable;
-
-/**
- * A ZooKeeper client that provides asynchronous zookeeper operations.
- */
-public interface ZKClient {
-
-  /**
-   * Returns the current Zookeeper session ID of this client.
-   * If this ZKClient is not connected, {@code null} is returned.
-   */
-  Long getSessionId();
-
-  /**
-   * Returns the connection string used for connecting to Zookeeper.
-   */
-  String getConnectString();
-
-  /**
-   * Adds a {@link Watcher} that will be called whenever connection state change.
-   * @param watcher The watcher to set.
-   */
-  void addConnectionWatcher(Watcher watcher);
-
-  /**
-   * Same as calling
-   * {@link #create(String, byte[], org.apache.zookeeper.CreateMode, boolean) create(path, data, createMode, true)}.
-   *
-   * @see #create(String, byte[], org.apache.zookeeper.CreateMode, boolean)
-   */
-  OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode);
-
-  /**
-   * Creates a path in zookeeper, with given data and create mode.
-   *
-   * @param path Path to be created
-   * @param data Data to be stored in the node, or {@code null} if no data to store.
-   * @param createMode The {@link org.apache.zookeeper.CreateMode} for the node.
-   * @param createParent If {@code true} and parent nodes are missing, it will create all parent nodes as normal
-   *                     persistent node before creating the request node.
-   * @return A {@link OperationFuture} that will be completed when the
-   *         creation is done. If there is error during creation, it will be reflected as error in the future.
-   */
-  OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent);
-
-  /**
-   * Checks if the path exists. Same as calling
-   * {@link #exists(String, org.apache.zookeeper.Watcher) exists(path, null)}.
-   *
-   * @see #exists(String, org.apache.zookeeper.Watcher)
-   */
-  OperationFuture<Stat> exists(String path);
-
-  /**
-   * Checks if the given path exists and leave a watcher on the node for watching creation/deletion/data changes
-   * on the node.
-   *
-   * @param path The path to check for existence.
-   * @param watcher Watcher for watching changes, or {@code null} if no watcher to set.
-   * @return A {@link OperationFuture} that will be completed when the exists check is done. If the path
-   *         does exists, the node {@link Stat} is set into the future. If the path doesn't exists,
-   *         a {@code null} value is set into the future.
-   */
-  OperationFuture<Stat> exists(String path, @Nullable Watcher watcher);
-
-  /**
-   * Gets the list of children nodes under the given path. Same as calling
-   * {@link #getChildren(String, org.apache.zookeeper.Watcher) getChildren(path, null)}.
-   *
-   * @see #getChildren(String, org.apache.zookeeper.Watcher)
-   */
-  OperationFuture<NodeChildren> getChildren(String path);
-
-  /**
-   * Gets the list of children nodes under the given path and leave a watcher on the node for watching node
-   * deletion and children nodes creation/deletion.
-   *
-   * @param path The path to fetch for children nodes
-   * @param watcher Watcher for watching changes, or {@code null} if no watcher to set.
-   * @return A {@link OperationFuture} that will be completed when the getChildren call is done, with the result
-   *         given as {@link NodeChildren}. If there is error, it will be reflected as error in the future.
-   */
-  OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher);
-
-  /**
-   * Gets the data stored in the given path. Same as calling
-   * {@link #getData(String, org.apache.zookeeper.Watcher) getData(path, null)}.
-   */
-  OperationFuture<NodeData> getData(String path);
-
-  /**
-   * Gets the data stored in the given path and leave a watcher on the node for watching deletion/data changes on
-   * the node.
-   *
-   * @param path The path to get data from.
-   * @param watcher Watcher for watching changes, or {@code null} if no watcher to set.
-   * @return A {@link OperationFuture} that will be completed when the getData call is done, with the result
-   *         given as {@link NodeData}. If there is error, it will be reflected as error in the future.
-   */
-  OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher);
-
-  /**
-   * Sets the data for the given path without matching version. Same as calling
-   * {@link #setData(String, byte[], int) setData(path, data, -1)}.
-   */
-  OperationFuture<Stat> setData(String path, byte[] data);
-
-  /**
-   * Sets the data for the given path that match the given version. If the version given is {@code -1}, it matches
-   * any version.
-   *
-   * @param dataPath The path to set data to.
-   * @param data Data to be set.
-   * @param version Matching version.
-   * @return A {@link OperationFuture} that will be completed when the setData call is done, with node {@link Stat}
-   *         given as the future result. If there is error, it will be reflected as error in the future.
-   */
-  OperationFuture<Stat> setData(String dataPath, byte[] data, int version);
-
-  /**
-   * Deletes the node of the given path without matching version. Same as calling
-   * {@link #delete(String, int) delete(path, -1)}.
-   *
-   * @see #delete(String, int)
-   */
-  OperationFuture<String> delete(String path);
-
-  /**
-   * Deletes the node of the given path that match the given version. If the version given is {@code -1}, it matches
-   * any version.
-   *
-   * @param deletePath The path to set data to.
-   * @param version Matching version.
-   * @return A {@link OperationFuture} that will be completed when the setData call is done, with node path
-   *         given as the future result. If there is error, it will be reflected as error in the future.
-   */
-  OperationFuture<String> delete(String deletePath, int version);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
deleted file mode 100644
index 63f27fb..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.twill.internal.zookeeper.DefaultZKClientService;
-import com.google.common.base.Supplier;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Service;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * A {@link ZKClient} that extends from {@link Service} to provide lifecycle management functions.
- * The {@link #start()} method needed to be called before calling any other method on this interface.
- * When the client is no longer needed, call {@link #stop()} to release any resources that it holds.
- */
-public interface ZKClientService extends ZKClient, Service {
-
-  /**
-   * Returns a {@link Supplier} of {@link ZooKeeper} that gives the current {@link ZooKeeper} in use at the moment
-   * when {@link com.google.common.base.Supplier#get()} get called.
-   *
-   * @return A {@link Supplier Supplier&lt;ZooKeeper&gt;}
-   */
-  Supplier<ZooKeeper> getZooKeeperSupplier();
-
-  /**
-   * Builder for creating an implementation of {@link ZKClientService}.
-   * The default client timeout is 10000ms.
-   */
-  public static final class Builder {
-
-    private final String connectStr;
-    private int timeout = 10000;
-    private Watcher connectionWatcher;
-    private Multimap<String, ACL> acls = HashMultimap.create();
-
-    /**
-     * Creates a {@link Builder} with the given ZooKeeper connection string.
-     * @param connectStr The connection string.
-     * @return A new instance of Builder.
-     */
-    public static Builder of(String connectStr) {
-      return new Builder(connectStr);
-    }
-
-    /**
-     * Sets the client timeout to the give milliseconds.
-     * @param timeout timeout in milliseconds.
-     * @return This builder
-     */
-    public Builder setSessionTimeout(int timeout) {
-      this.timeout = timeout;
-      return this;
-    }
-
-    /**
-     * Sets a {@link Watcher} that will be called whenever connection state change.
-     * @param watcher The watcher to set.
-     * @return This builder.
-     */
-    public Builder setConnectionWatcher(Watcher watcher) {
-      this.connectionWatcher = watcher;
-      return this;
-    }
-
-    /**
-     * Creates an instance of {@link ZKClientService} with the settings of this builder.
-     * @return A new instance of {@link ZKClientService}.
-     */
-    public ZKClientService build() {
-      return new DefaultZKClientService(connectStr, timeout, connectionWatcher);
-    }
-
-    private Builder(String connectStr) {
-      this.connectStr = connectStr;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
deleted file mode 100644
index cc38c76..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import javax.annotation.Nullable;
-
-/**
- * Provides static factory method to create {@link ZKClientService} with modified behaviors.
- */
-public final class ZKClientServices {
-
-  /**
-   * Creates a {@link ZKClientService} from the given {@link ZKClient} if the given {@link ZKClient} is an instance of
-   * {@link ZKClientService} or is a {@link ForwardingZKClient} that eventually trace back to a delegate of type
-   * {@link ZKClientService}. If such a {@link ZKClientService} instance is found, this method returns
-   * an instance by invoking {@link #delegate(ZKClient, ZKClientService)} with the given {@link ZKClient} and
-   * the {@link ZKClientService} found respectively.
-   *
-   * @param client The {@link ZKClient}.
-   * @return A {@link ZKClientService}.
-   * @throws IllegalArgumentException If no {@link ZKClientService} is found.
-   */
-  public static ZKClientService delegate(ZKClient client) {
-    ZKClient zkClient = client;
-    while (!(zkClient instanceof ZKClientService) && zkClient instanceof ForwardingZKClient) {
-      zkClient = ((ForwardingZKClient) zkClient).getDelegate();
-    }
-    if (zkClient instanceof ZKClientService) {
-      return delegate(client, (ZKClientService) zkClient);
-    }
-    throw new IllegalArgumentException("No ZKClientService found from the delegation hierarchy");
-  }
-
-  /**
-   * Creates a {@link ZKClientService} that for all {@link ZKClient} methods would be delegated to another
-   * {@link ZKClient}, while methods for {@link ZKClientService} would be delegated to another {@link ZKClientService},
-   * which the given {@link ZKClient} and {@link ZKClientService} could be different instances.
-   *
-   * @param client The {@link ZKClient} for delegation
-   * @param clientService The {@link ZKClientService} for delegation.
-   * @return A {@link ZKClientService}.
-   */
-  public static ZKClientService delegate(final ZKClient client, ZKClientService clientService) {
-    return new ForwardingZKClientService(clientService) {
-
-      @Override
-      public Long getSessionId() {
-        return client.getSessionId();
-      }
-
-      @Override
-      public String getConnectString() {
-        return client.getConnectString();
-      }
-
-      @Override
-      public void addConnectionWatcher(Watcher watcher) {
-        client.addConnectionWatcher(watcher);
-      }
-
-      @Override
-      public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
-        return client.create(path, data, createMode);
-      }
-
-      @Override
-      public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
-                                            boolean createParent) {
-        return client.create(path, data, createMode, createParent);
-      }
-
-      @Override
-      public OperationFuture<Stat> exists(String path) {
-        return client.exists(path);
-      }
-
-      @Override
-      public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
-        return client.exists(path, watcher);
-      }
-
-      @Override
-      public OperationFuture<NodeChildren> getChildren(String path) {
-        return client.getChildren(path);
-      }
-
-      @Override
-      public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
-        return client.getChildren(path, watcher);
-      }
-
-      @Override
-      public OperationFuture<NodeData> getData(String path) {
-        return client.getData(path);
-      }
-
-      @Override
-      public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
-        return client.getData(path, watcher);
-      }
-
-      @Override
-      public OperationFuture<Stat> setData(String path, byte[] data) {
-        return client.setData(path, data);
-      }
-
-      @Override
-      public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
-        return client.setData(dataPath, data, version);
-      }
-
-      @Override
-      public OperationFuture<String> delete(String path) {
-        return client.delete(path);
-      }
-
-      @Override
-      public OperationFuture<String> delete(String deletePath, int version) {
-        return client.delete(deletePath, version);
-      }
-    };
-  }
-
-  private ZKClientServices() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClients.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClients.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClients.java
deleted file mode 100644
index f67c1bd..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClients.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.twill.internal.zookeeper.FailureRetryZKClient;
-import org.apache.twill.internal.zookeeper.NamespaceZKClient;
-import org.apache.twill.internal.zookeeper.RewatchOnExpireZKClient;
-
-/**
- *
- */
-public final class ZKClients {
-
-  /**
-   * Creates a {@link ZKClient} that will perform auto re-watch on all existing watches
-   * when reconnection happens after session expiration. All {@link org.apache.zookeeper.Watcher Watchers}
-   * set through the returned {@link ZKClient} would not receive any connection events.
-   *
-   * @param client The {@link ZKClient} for operations delegation.
-   * @return A {@link ZKClient} that will do auto re-watch on all methods that accept a
-   *        {@link org.apache.zookeeper.Watcher} upon session expiration.
-   */
-  public static ZKClient reWatchOnExpire(ZKClient client) {
-    return new RewatchOnExpireZKClient(client);
-  }
-
-  /**
-   * Creates a {@link ZKClient} that will retry interim failure (e.g. connection loss, session expiration)
-   * based on the given {@link RetryStrategy}.
-   *
-   * @param client The {@link ZKClient} for operations delegation.
-   * @param retryStrategy The {@link RetryStrategy} to be invoke when there is operation failure.
-   * @return A {@link ZKClient}.
-   */
-  public static ZKClient retryOnFailure(ZKClient client, RetryStrategy retryStrategy) {
-    return new FailureRetryZKClient(client, retryStrategy);
-  }
-
-
-  public static ZKClient namespace(ZKClient zkClient, String namespace) {
-    return new NamespaceZKClient(zkClient, namespace);
-  }
-
-  private ZKClients() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
deleted file mode 100644
index 6dcd1a7..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.zookeeper.SettableOperationFuture;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Collection of helper methods for common operations that usually needed when interacting with ZooKeeper.
- */
-public final class ZKOperations {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ZKOperations.class);
-
-  /**
-   * Represents a ZK operation updates callback.
-   * @param <T> Type of updated data.
-   */
-  public interface Callback<T> {
-    void updated(T data);
-  }
-
-  /**
-   * Interface for defining callback method to receive node data updates.
-   */
-  public interface DataCallback extends Callback<NodeData> {
-    /**
-     * Invoked when data of the node changed.
-     * @param nodeData New data of the node, or {@code null} if the node has been deleted.
-     */
-    @Override
-    void updated(NodeData nodeData);
-  }
-
-  /**
-   * Interface for defining callback method to receive children nodes updates.
-   */
-  public interface ChildrenCallback extends Callback<NodeChildren> {
-    @Override
-    void updated(NodeChildren nodeChildren);
-  }
-
-  private interface Operation<T> {
-    ZKClient getZKClient();
-
-    OperationFuture<T> exec(String path, Watcher watcher);
-  }
-
-  /**
-   * Watch for data changes of the given path. The callback will be triggered whenever changes has been
-   * detected. Note that the callback won't see every single changes, as that's not the guarantee of ZooKeeper.
-   * If the node doesn't exists, it will watch for its creation then starts watching for data changes.
-   * When the node is deleted afterwards,
-   *
-   * @param zkClient The {@link ZKClient} for the operation
-   * @param path Path to watch
-   * @param callback Callback to be invoked when data changes is detected.
-   * @return A {@link Cancellable} to cancel the watch.
-   */
-  public static Cancellable watchData(final ZKClient zkClient, final String path, final DataCallback callback) {
-    final AtomicBoolean cancelled = new AtomicBoolean(false);
-    watchChanges(new Operation<NodeData>() {
-
-      @Override
-      public ZKClient getZKClient() {
-        return zkClient;
-      }
-
-      @Override
-      public OperationFuture<NodeData> exec(String path, Watcher watcher) {
-        return zkClient.getData(path, watcher);
-      }
-    }, path, callback, cancelled);
-
-    return new Cancellable() {
-      @Override
-      public void cancel() {
-        cancelled.set(true);
-      }
-    };
-  }
-
-  public static ListenableFuture<String> watchDeleted(final ZKClient zkClient, final String path) {
-    SettableFuture<String> completion = SettableFuture.create();
-    watchDeleted(zkClient, path, completion);
-    return completion;
-  }
-
-  public static void watchDeleted(final ZKClient zkClient, final String path,
-                                  final SettableFuture<String> completion) {
-
-    Futures.addCallback(zkClient.exists(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (!completion.isDone()) {
-          if (event.getType() == Event.EventType.NodeDeleted) {
-            completion.set(path);
-          } else {
-            watchDeleted(zkClient, path, completion);
-          }
-        }
-      }
-    }), new FutureCallback<Stat>() {
-      @Override
-      public void onSuccess(Stat result) {
-        if (result == null) {
-          completion.set(path);
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        completion.setException(t);
-      }
-    });
-  }
-
-  public static Cancellable watchChildren(final ZKClient zkClient, String path, ChildrenCallback callback) {
-    final AtomicBoolean cancelled = new AtomicBoolean(false);
-    watchChanges(new Operation<NodeChildren>() {
-
-      @Override
-      public ZKClient getZKClient() {
-        return zkClient;
-      }
-
-      @Override
-      public OperationFuture<NodeChildren> exec(String path, Watcher watcher) {
-        return zkClient.getChildren(path, watcher);
-      }
-    }, path, callback, cancelled);
-
-    return new Cancellable() {
-      @Override
-      public void cancel() {
-        cancelled.set(true);
-      }
-    };
-  }
-
-  /**
-   * Returns a new {@link OperationFuture} that the result will be the same as the given future, except that when
-   * the source future is having an exception matching the giving exception type, the errorResult will be set
-   * in to the returned {@link OperationFuture}.
-   * @param future The source future.
-   * @param exceptionType Type of {@link KeeperException} to be ignored.
-   * @param errorResult Object to be set into the resulting future on a matching exception.
-   * @param <V> Type of the result.
-   * @return A new {@link OperationFuture}.
-   */
-  public static <V> OperationFuture<V> ignoreError(OperationFuture<V> future,
-                                                   final Class<? extends KeeperException> exceptionType,
-                                                   final V errorResult) {
-    final SettableOperationFuture<V> resultFuture = SettableOperationFuture.create(future.getRequestPath(),
-                                                                                   Threads.SAME_THREAD_EXECUTOR);
-
-    Futures.addCallback(future, new FutureCallback<V>() {
-      @Override
-      public void onSuccess(V result) {
-        resultFuture.set(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        if (exceptionType.isAssignableFrom(t.getClass())) {
-          resultFuture.set(errorResult);
-        } else if (t instanceof CancellationException) {
-          resultFuture.cancel(true);
-        } else {
-          resultFuture.setException(t);
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    return resultFuture;
-  }
-
-  /**
-   * Deletes the given path recursively. The delete method will keep running until the given path is successfully
-   * removed, which means if there are new node created under the given path while deleting, they'll get deleted
-   * again.  If there is {@link KeeperException} during the deletion other than
-   * {@link KeeperException.NotEmptyException} or {@link KeeperException.NoNodeException},
-   * the exception would be reflected in the result future and deletion process will stop,
-   * leaving the given path with intermediate state.
-   *
-   * @param path The path to delete.
-   * @return An {@link OperationFuture} that will be completed when the given path is deleted or bailed due to
-   *         exception.
-   */
-  public static OperationFuture<String> recursiveDelete(final ZKClient zkClient, final String path) {
-    final SettableOperationFuture<String> resultFuture =
-      SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
-
-    // Try to delete the given path.
-    Futures.addCallback(zkClient.delete(path), new FutureCallback<String>() {
-      private final FutureCallback<String> deleteCallback = this;
-
-      @Override
-      public void onSuccess(String result) {
-        // Path deleted successfully. Operation done.
-        resultFuture.set(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // Failed to delete the given path
-        if (!(t instanceof KeeperException.NotEmptyException || t instanceof KeeperException.NoNodeException)) {
-          // For errors other than NotEmptyException, treat the operation as failed.
-          resultFuture.setException(t);
-          return;
-        }
-
-        // If failed because of NotEmptyException, get the list of children under the given path
-        Futures.addCallback(zkClient.getChildren(path), new FutureCallback<NodeChildren>() {
-
-          @Override
-          public void onSuccess(NodeChildren result) {
-            // Delete all children nodes recursively.
-            final List<OperationFuture<String>> deleteFutures = Lists.newLinkedList();
-            for (String child :result.getChildren()) {
-              deleteFutures.add(recursiveDelete(zkClient, path + "/" + child));
-            }
-
-            // When deletion of all children succeeded, delete the given path again.
-            Futures.successfulAsList(deleteFutures).addListener(new Runnable() {
-              @Override
-              public void run() {
-                for (OperationFuture<String> deleteFuture : deleteFutures) {
-                  try {
-                    // If any exception when deleting children, treat the operation as failed.
-                    deleteFuture.get();
-                  } catch (Exception e) {
-                    resultFuture.setException(e.getCause());
-                  }
-                }
-                Futures.addCallback(zkClient.delete(path), deleteCallback, Threads.SAME_THREAD_EXECUTOR);
-              }
-            }, Threads.SAME_THREAD_EXECUTOR);
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            // If failed to get list of children, treat the operation as failed.
-            resultFuture.setException(t);
-          }
-        }, Threads.SAME_THREAD_EXECUTOR);
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    return resultFuture;
-  }
-
-  /**
-   * Watch for the given path until it exists.
-   * @param zkClient The {@link ZKClient} to use.
-   * @param path A ZooKeeper path to watch for existent.
-   */
-  private static void watchExists(final ZKClient zkClient, final String path, final SettableFuture<String> completion) {
-    Futures.addCallback(zkClient.exists(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (!completion.isDone()) {
-          watchExists(zkClient, path, completion);
-        }
-      }
-    }), new FutureCallback<Stat>() {
-      @Override
-      public void onSuccess(Stat result) {
-        if (result != null) {
-          completion.set(path);
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        completion.setException(t);
-      }
-    });
-  }
-
-  private static <T> void watchChanges(final Operation<T> operation, final String path,
-                                       final Callback<T> callback, final AtomicBoolean cancelled) {
-    Futures.addCallback(operation.exec(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (!cancelled.get()) {
-          watchChanges(operation, path, callback, cancelled);
-        }
-      }
-    }), new FutureCallback<T>() {
-      @Override
-      public void onSuccess(T result) {
-        if (!cancelled.get()) {
-          callback.updated(result);
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
-          final SettableFuture<String> existCompletion = SettableFuture.create();
-          existCompletion.addListener(new Runnable() {
-            @Override
-            public void run() {
-              try {
-                if (!cancelled.get()) {
-                  watchChanges(operation, existCompletion.get(), callback, cancelled);
-                }
-              } catch (Exception e) {
-                LOG.error("Failed to watch children for path " + path, e);
-              }
-            }
-          }, Threads.SAME_THREAD_EXECUTOR);
-          watchExists(operation.getZKClient(), path, existCompletion);
-          return;
-        }
-        LOG.error("Failed to watch data for path " + path + " " + t, t);
-      }
-    });
-  }
-
-  private ZKOperations() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java b/zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
deleted file mode 100644
index e5bd237..0000000
--- a/zookeeper/src/main/java/org/apache/twill/zookeeper/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package provides functionality for ZooKeeper interactions.
- */
-package org.apache.twill.zookeeper;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java b/zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
deleted file mode 100644
index 601f0bd..0000000
--- a/zookeeper/src/test/java/org/apache/twill/zookeeper/RetryStrategyTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class RetryStrategyTest {
-
-  @Test
-  public void testNoRetry() {
-    RetryStrategy strategy = RetryStrategies.noRetry();
-    long startTime = System.currentTimeMillis();
-    for (int i = 1; i <= 10; i++) {
-      Assert.assertEquals(-1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
-    }
-  }
-
-  @Test
-  public void testLimit() {
-    RetryStrategy strategy = RetryStrategies.limit(10, RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS));
-    long startTime = System.currentTimeMillis();
-    for (int i = 1; i <= 10; i++) {
-      Assert.assertEquals(1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
-    }
-    Assert.assertEquals(-1L, strategy.nextRetry(11, startTime, RetryStrategy.OperationType.CREATE, "/"));
-  }
-
-  @Test
-  public void testUnlimited() {
-    RetryStrategy strategy = RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS);
-    long startTime = System.currentTimeMillis();
-    for (int i = 1; i <= 10; i++) {
-      Assert.assertEquals(1L, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
-    }
-    Assert.assertEquals(1L, strategy.nextRetry(100000, startTime, RetryStrategy.OperationType.CREATE, "/"));
-  }
-
-  @Test
-  public void testExponential() {
-    RetryStrategy strategy = RetryStrategies.exponentialDelay(1, 60000, TimeUnit.MILLISECONDS);
-    long startTime = System.currentTimeMillis();
-    for (int i = 1; i <= 16; i++) {
-      Assert.assertEquals(1L << (i - 1), strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
-    }
-    for (int i = 60; i <= 80; i++) {
-      Assert.assertEquals(60000, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
-    }
-  }
-
-  @Test
-  public void testExponentialLimit() {
-    RetryStrategy strategy = RetryStrategies.limit(99,
-                                                   RetryStrategies.exponentialDelay(1, 60000, TimeUnit.MILLISECONDS));
-    long startTime = System.currentTimeMillis();
-    for (int i = 1; i <= 16; i++) {
-      Assert.assertEquals(1L << (i - 1), strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
-    }
-    for (int i = 60; i <= 80; i++) {
-      Assert.assertEquals(60000, strategy.nextRetry(i, startTime, RetryStrategy.OperationType.CREATE, "/"));
-    }
-    Assert.assertEquals(-1L, strategy.nextRetry(100, startTime, RetryStrategy.OperationType.CREATE, "/"));
-  }
-
-  @Test
-  public void testTimeLimit() throws InterruptedException {
-    RetryStrategy strategy = RetryStrategies.timeLimit(1, TimeUnit.SECONDS,
-                                                       RetryStrategies.fixDelay(1, TimeUnit.MILLISECONDS));
-    long startTime = System.currentTimeMillis();
-    Assert.assertEquals(1L, strategy.nextRetry(1, startTime, RetryStrategy.OperationType.CREATE, "/"));
-    TimeUnit.MILLISECONDS.sleep(1100);
-    Assert.assertEquals(-1L, strategy.nextRetry(2, startTime, RetryStrategy.OperationType.CREATE, "/"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
deleted file mode 100644
index f1db74a..0000000
--- a/zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.internal.zookeeper.KillZKSession;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- *
- */
-public class ZKClientTest {
-
-  @Test
-  public void testChroot() throws Exception {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
-    zkServer.startAndWait();
-
-    try {
-      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/chroot").build();
-      client.startAndWait();
-      try {
-        List<OperationFuture<String>> futures = Lists.newArrayList();
-        futures.add(client.create("/test1/test2", null, CreateMode.PERSISTENT));
-        futures.add(client.create("/test1/test3", null, CreateMode.PERSISTENT));
-        Futures.successfulAsList(futures).get();
-
-        Assert.assertNotNull(client.exists("/test1/test2").get());
-        Assert.assertNotNull(client.exists("/test1/test3").get());
-
-      } finally {
-        client.stopAndWait();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  @Test
-  public void testCreateParent() throws ExecutionException, InterruptedException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
-    zkServer.startAndWait();
-
-    try {
-      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      client.startAndWait();
-
-      try {
-        String path = client.create("/test1/test2/test3/test4/test5",
-                                    "testing".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL).get();
-        Assert.assertTrue(path.startsWith("/test1/test2/test3/test4/test5"));
-
-        String dataPath = "";
-        for (int i = 1; i <= 4; i++) {
-          dataPath = dataPath + "/test" + i;
-          Assert.assertNull(client.getData(dataPath).get().getData());
-        }
-        Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData(path).get().getData()));
-      } finally {
-        client.stopAndWait();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  @Test
-  public void testGetChildren() throws ExecutionException, InterruptedException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
-    zkServer.startAndWait();
-
-    try {
-      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      client.startAndWait();
-
-      try {
-        client.create("/test", null, CreateMode.PERSISTENT).get();
-        Assert.assertTrue(client.getChildren("/test").get().getChildren().isEmpty());
-
-        Futures.allAsList(ImmutableList.of(client.create("/test/c1", null, CreateMode.EPHEMERAL),
-                                           client.create("/test/c2", null, CreateMode.EPHEMERAL))).get();
-
-        NodeChildren nodeChildren = client.getChildren("/test").get();
-        Assert.assertEquals(2, nodeChildren.getChildren().size());
-
-        Assert.assertEquals(ImmutableSet.of("c1", "c2"), ImmutableSet.copyOf(nodeChildren.getChildren()));
-
-      } finally {
-        client.stopAndWait();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  @Test
-  public void testSetData() throws ExecutionException, InterruptedException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
-    zkServer.startAndWait();
-
-    try {
-      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      client.startAndWait();
-
-      client.create("/test", null, CreateMode.PERSISTENT).get();
-      Assert.assertNull(client.getData("/test").get().getData());
-
-      client.setData("/test", "testing".getBytes()).get();
-      Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData("/test").get().getData()));
-
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  @Test
-  public void testExpireRewatch() throws InterruptedException, IOException, ExecutionException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
-    zkServer.startAndWait();
-
-    try {
-      final CountDownLatch expireReconnectLatch = new CountDownLatch(1);
-      final AtomicBoolean expired = new AtomicBoolean(false);
-      final ZKClientService client = ZKClientServices.delegate(ZKClients.reWatchOnExpire(
-                                        ZKClientService.Builder.of(zkServer.getConnectionStr())
-                                                       .setSessionTimeout(2000)
-                                                       .setConnectionWatcher(new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-              if (event.getState() == Event.KeeperState.Expired) {
-                expired.set(true);
-              } else if (event.getState() == Event.KeeperState.SyncConnected && expired.compareAndSet(true, true)) {
-                expireReconnectLatch.countDown();
-              }
-            }
-          }).build()));
-      client.startAndWait();
-
-      try {
-        final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<Watcher.Event.EventType>();
-        client.exists("/expireRewatch", new Watcher() {
-          @Override
-          public void process(WatchedEvent event) {
-            client.exists("/expireRewatch", this);
-            events.add(event.getType());
-          }
-        });
-
-        client.create("/expireRewatch", null, CreateMode.PERSISTENT);
-        Assert.assertEquals(Watcher.Event.EventType.NodeCreated, events.poll(2, TimeUnit.SECONDS));
-
-        KillZKSession.kill(client.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 1000);
-
-        Assert.assertTrue(expireReconnectLatch.await(5, TimeUnit.SECONDS));
-
-        client.delete("/expireRewatch");
-        Assert.assertEquals(Watcher.Event.EventType.NodeDeleted, events.poll(4, TimeUnit.SECONDS));
-      } finally {
-        client.stopAndWait();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  @Test
-  public void testRetry() throws ExecutionException, InterruptedException, TimeoutException {
-    File dataDir = Files.createTempDir();
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(dataDir).setTickTime(1000).build();
-    zkServer.startAndWait();
-    int port = zkServer.getLocalAddress().getPort();
-
-    final CountDownLatch disconnectLatch = new CountDownLatch(1);
-    ZKClientService client = ZKClientServices.delegate(ZKClients.retryOnFailure(
-      ZKClientService.Builder.of(zkServer.getConnectionStr()).setConnectionWatcher(new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getState() == Event.KeeperState.Disconnected) {
-          disconnectLatch.countDown();
-        }
-      }
-    }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS)));
-    client.startAndWait();
-
-    zkServer.stopAndWait();
-
-    Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
-
-    final CountDownLatch createLatch = new CountDownLatch(1);
-    Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String result) {
-        createLatch.countDown();
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        t.printStackTrace(System.out);
-      }
-    });
-
-    TimeUnit.SECONDS.sleep(2);
-    zkServer = InMemoryZKServer.builder()
-                               .setDataDir(dataDir)
-                               .setAutoCleanDataDir(true)
-                               .setPort(port)
-                               .setTickTime(1000)
-                               .build();
-    zkServer.startAndWait();
-
-    try {
-      Assert.assertTrue(createLatch.await(5, TimeUnit.SECONDS));
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java b/zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
deleted file mode 100644
index 9518d6e..0000000
--- a/zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.zookeeper;
-
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.zookeeper.CreateMode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- *
- */
-public class ZKOperationsTest {
-
-  @Test
-  public void recursiveDelete() throws ExecutionException, InterruptedException, TimeoutException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
-    zkServer.startAndWait();
-
-    try {
-      ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      client.startAndWait();
-
-      try {
-        client.create("/test1/test10/test101", null, CreateMode.PERSISTENT).get();
-        client.create("/test1/test10/test102", null, CreateMode.PERSISTENT).get();
-        client.create("/test1/test10/test103", null, CreateMode.PERSISTENT).get();
-
-        client.create("/test1/test11/test111", null, CreateMode.PERSISTENT).get();
-        client.create("/test1/test11/test112", null, CreateMode.PERSISTENT).get();
-        client.create("/test1/test11/test113", null, CreateMode.PERSISTENT).get();
-
-        ZKOperations.recursiveDelete(client, "/test1").get(2, TimeUnit.SECONDS);
-
-        Assert.assertNull(client.exists("/test1").get(2, TimeUnit.SECONDS));
-
-      } finally {
-        client.stopAndWait();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-}