You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:51 UTC

[09/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
new file mode 100644
index 0000000..65ceadb
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.ForwardingZKClient;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.RetryStrategy;
+import org.apache.twill.zookeeper.RetryStrategy.OperationType;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@link ZKClient} that will invoke {@link RetryStrategy} on operation failure.
+ * This {@link ZKClient} works by delegating calls to another {@link ZKClient}
+ * and listen for the result. If the result is a failure, and is
+ * {@link RetryUtils#canRetry(org.apache.zookeeper.KeeperException.Code) retryable}, the given {@link RetryStrategy}
+ * will be called to determine the next retry time, or give up, depending on the value returned by the strategy.
+ */
+public final class FailureRetryZKClient extends ForwardingZKClient {
+
+  private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor(
+                                                                Threads.createDaemonThreadFactory("retry-zkclient"));
+  private final RetryStrategy retryStrategy;
+
+  public FailureRetryZKClient(ZKClient delegate, RetryStrategy retryStrategy) {
+    super(delegate);
+    this.retryStrategy = retryStrategy;
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
+    return create(path, data, createMode, true);
+  }
+
+  @Override
+  public OperationFuture<String> create(final String path, final byte[] data,
+                                        final CreateMode createMode, final boolean createParent) {
+
+    // No retry for any SEQUENTIAL node, as some algorithms depends on only one sequential node being created.
+    if (createMode == CreateMode.PERSISTENT_SEQUENTIAL || createMode == CreateMode.EPHEMERAL_SEQUENTIAL) {
+      return super.create(path, data, createMode, createParent);
+    }
+
+    final SettableOperationFuture<String> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+    Futures.addCallback(super.create(path, data, createMode, createParent),
+                        new OperationFutureCallback<String>(OperationType.CREATE, System.currentTimeMillis(),
+                                                            path, result, new Supplier<OperationFuture<String>>() {
+                          @Override
+                          public OperationFuture<String> get() {
+                            return FailureRetryZKClient.super.create(path, data, createMode, createParent);
+                          }
+                        }));
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path) {
+    return exists(path, null);
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(final String path, final Watcher watcher) {
+    final SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+    Futures.addCallback(super.exists(path, watcher),
+                        new OperationFutureCallback<Stat>(OperationType.EXISTS, System.currentTimeMillis(),
+                                                          path, result, new Supplier<OperationFuture<Stat>>() {
+                          @Override
+                          public OperationFuture<Stat> get() {
+                            return FailureRetryZKClient.super.exists(path, watcher);
+                          }
+                        }));
+    return result;
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path) {
+    return getChildren(path, null);
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(final String path, final Watcher watcher) {
+    final SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path,
+                                                                                        Threads.SAME_THREAD_EXECUTOR);
+    Futures.addCallback(super.getChildren(path, watcher),
+                        new OperationFutureCallback<NodeChildren>(OperationType.GET_CHILDREN,
+                                                                  System.currentTimeMillis(), path, result,
+                                                                  new Supplier<OperationFuture<NodeChildren>>() {
+                          @Override
+                          public OperationFuture<NodeChildren> get() {
+                            return FailureRetryZKClient.super.getChildren(path, watcher);
+                          }
+                        }));
+    return result;
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path) {
+    return getData(path, null);
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(final String path, final Watcher watcher) {
+    final SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+    Futures.addCallback(super.getData(path, watcher),
+                        new OperationFutureCallback<NodeData>(OperationType.GET_DATA, System.currentTimeMillis(),
+                                                              path, result, new Supplier<OperationFuture<NodeData>>() {
+                          @Override
+                          public OperationFuture<NodeData> get() {
+                            return FailureRetryZKClient.super.getData(path, watcher);
+                          }
+                        }));
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String path, byte[] data) {
+    return setData(path, data, -1);
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(final String dataPath, final byte[] data, final int version) {
+    final SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, Threads.SAME_THREAD_EXECUTOR);
+    Futures.addCallback(super.setData(dataPath, data, version),
+                        new OperationFutureCallback<Stat>(OperationType.SET_DATA, System.currentTimeMillis(),
+                                                          dataPath, result, new Supplier<OperationFuture<Stat>>() {
+                          @Override
+                          public OperationFuture<Stat> get() {
+                            return FailureRetryZKClient.super.setData(dataPath, data, version);
+                          }
+                        }));
+    return result;
+  }
+
+  @Override
+  public OperationFuture<String> delete(String path) {
+    return delete(path, -1);
+  }
+
+  @Override
+  public OperationFuture<String> delete(final String deletePath, final int version) {
+    final SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath,
+                                                                                  Threads.SAME_THREAD_EXECUTOR);
+    Futures.addCallback(super.delete(deletePath, version),
+                        new OperationFutureCallback<String>(OperationType.DELETE, System.currentTimeMillis(),
+                                                            deletePath, result, new Supplier<OperationFuture<String>>
+                          () {
+                          @Override
+                          public OperationFuture<String> get() {
+                            return FailureRetryZKClient.super.delete(deletePath, version);
+                          }
+                        }));
+    return result;
+  }
+
+  /**
+   * Callback to watch for operation result and trigger retry if necessary.
+   * @param <V> Type of operation result.
+   */
+  private final class OperationFutureCallback<V> implements FutureCallback<V> {
+
+    private final OperationType type;
+    private final long startTime;
+    private final String path;
+    private final SettableOperationFuture<V> result;
+    private final Supplier<OperationFuture<V>> retryAction;
+    private final AtomicInteger failureCount;
+
+    private OperationFutureCallback(OperationType type, long startTime, String path,
+                                    SettableOperationFuture<V> result, Supplier<OperationFuture<V>> retryAction) {
+      this.type = type;
+      this.startTime = startTime;
+      this.path = path;
+      this.result = result;
+      this.retryAction = retryAction;
+      this.failureCount = new AtomicInteger(0);
+    }
+
+    @Override
+    public void onSuccess(V result) {
+      this.result.set(result);
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      if (!doRetry(t)) {
+        result.setException(t);
+      }
+    }
+
+    private boolean doRetry(Throwable t) {
+      if (!RetryUtils.canRetry(t)) {
+        return false;
+      }
+
+      // Determine the relay delay
+      long nextRetry = retryStrategy.nextRetry(failureCount.incrementAndGet(), startTime, type, path);
+      if (nextRetry < 0) {
+        return false;
+      }
+
+      // Schedule the retry.
+      SCHEDULER.schedule(new Runnable() {
+        @Override
+        public void run() {
+          Futures.addCallback(retryAction.get(), OperationFutureCallback.this);
+        }
+      }, nextRetry, TimeUnit.MILLISECONDS);
+
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
new file mode 100644
index 0000000..c4eed59
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class InMemoryZKServer implements Service {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InMemoryZKServer.class);
+
+  private final File dataDir;
+  private final int tickTime;
+  private final boolean autoClean;
+  private final int port;
+  private final Service delegateService = new AbstractIdleService() {
+    @Override
+    protected void startUp() throws Exception {
+      ZooKeeperServer zkServer = new ZooKeeperServer();
+      FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
+      zkServer.setTxnLogFactory(ftxn);
+      zkServer.setTickTime(tickTime);
+
+      factory = ServerCnxnFactory.createFactory();
+      factory.configure(getAddress(port), -1);
+      factory.startup(zkServer);
+
+      LOG.info("In memory ZK started: " + getConnectionStr());
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      try {
+        factory.shutdown();
+      } finally {
+        if (autoClean) {
+          cleanDir(dataDir);
+        }
+      }
+    }
+  };
+
+  private ServerCnxnFactory factory;
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  private InMemoryZKServer(File dataDir, int tickTime, boolean autoClean, int port) {
+    if (dataDir == null) {
+      dataDir = Files.createTempDir();
+      autoClean = true;
+    } else {
+      Preconditions.checkArgument(dataDir.isDirectory() || dataDir.mkdirs() || dataDir.isDirectory());
+    }
+
+    this.dataDir = dataDir;
+    this.tickTime = tickTime;
+    this.autoClean = autoClean;
+    this.port = port;
+  }
+
+  public String getConnectionStr() {
+    InetSocketAddress addr = factory.getLocalAddress();
+    return String.format("%s:%d", addr.getHostName(), addr.getPort());
+  }
+
+  public InetSocketAddress getLocalAddress() {
+    return factory.getLocalAddress();
+  }
+
+  private InetSocketAddress getAddress(int port) {
+    try {
+//      return new InetSocketAddress(InetAddress.getByAddress(new byte[] {127, 0, 0, 1}), port < 0 ? 0 : port);
+      return new InetSocketAddress(InetAddress.getLocalHost(), port < 0 ? 0 : port);
+    } catch (UnknownHostException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private void cleanDir(File dir) {
+    File[] files = dir.listFiles();
+    if (files == null) {
+      return;
+    }
+    for (File file : files) {
+      if (file.isDirectory()) {
+        cleanDir(file);
+      }
+      file.delete();
+    }
+  }
+
+  @Override
+  public ListenableFuture<State> start() {
+    return delegateService.start();
+  }
+
+  @Override
+  public State startAndWait() {
+    return delegateService.startAndWait();
+  }
+
+  @Override
+  public boolean isRunning() {
+    return delegateService.isRunning();
+  }
+
+  @Override
+  public State state() {
+    return delegateService.state();
+  }
+
+  @Override
+  public ListenableFuture<State> stop() {
+    return delegateService.stop();
+  }
+
+  @Override
+  public State stopAndWait() {
+    return delegateService.stopAndWait();
+  }
+
+  @Override
+  public void addListener(Listener listener, Executor executor) {
+    delegateService.addListener(listener, executor);
+  }
+
+  /**
+   * Builder for creating instance of {@link InMemoryZKServer}.
+   */
+  public static final class Builder {
+    private File dataDir;
+    private boolean autoCleanDataDir = false;
+    private int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
+    private int port = -1;
+
+    public Builder setDataDir(File dataDir) {
+      this.dataDir = dataDir;
+      return this;
+    }
+
+    public Builder setAutoCleanDataDir(boolean auto) {
+      this.autoCleanDataDir = auto;
+      return this;
+    }
+
+    public Builder setTickTime(int tickTime) {
+      this.tickTime = tickTime;
+      return this;
+    }
+
+    public Builder setPort(int port) {
+      this.port = port;
+      return this;
+    }
+
+    public InMemoryZKServer build() {
+      return new InMemoryZKServer(dataDir, tickTime, autoCleanDataDir, port);
+    }
+
+    private Builder() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
new file mode 100644
index 0000000..bc01f08
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import com.google.common.base.Preconditions;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class for killing ZK client to simulate failures during testing.
+ */
+public final class KillZKSession {
+
+  /**
+   * Utility classes should have a public constructor or a default constructor
+   * hence made it private.
+   */
+  private KillZKSession() {}
+
+  /**
+   * Kills a Zookeeper client to simulate failure scenarious during testing.
+   * Callee will provide the amount of time to wait before it's considered failure
+   * to kill a client.
+   *
+   * @param client that needs to be killed.
+   * @param connectionString of Quorum
+   * @param maxMs time in millisecond specifying the max time to kill a client.
+   * @throws IOException When there is IO error
+   * @throws InterruptedException When call has been interrupted.
+   */
+  public static void kill(ZooKeeper client, String connectionString,
+                          int maxMs) throws IOException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+    ZooKeeper zk = new ZooKeeper(connectionString, maxMs, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getState() == Event.KeeperState.SyncConnected) {
+          latch.countDown();
+        }
+      }
+    }, client.getSessionId(), client.getSessionPasswd());
+
+    try {
+      Preconditions.checkState(latch.await(maxMs, TimeUnit.MILLISECONDS), "Fail to kill ZK connection.");
+    } finally {
+      zk.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
new file mode 100644
index 0000000..1a82e4b
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.ForwardingZKClient;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link ZKClient} that namespace every paths.
+ */
+public final class NamespaceZKClient extends ForwardingZKClient {
+  // This class extends from ForwardingZKClient but overrides every method is for letting the
+  // ZKClientServices delegate logic works.
+
+  private final String namespace;
+  private final ZKClient delegate;
+  private final String connectString;
+
+  public NamespaceZKClient(ZKClient delegate, String namespace) {
+    super(delegate);
+    this.namespace = namespace;
+    this.delegate = delegate;
+    this.connectString = delegate.getConnectString() + namespace;
+  }
+
+  @Override
+  public Long getSessionId() {
+    return delegate.getSessionId();
+  }
+
+  @Override
+  public String getConnectString() {
+    return connectString;
+  }
+
+  @Override
+  public void addConnectionWatcher(Watcher watcher) {
+    delegate.addConnectionWatcher(watcher);
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
+    return relayPath(delegate.create(namespace + path, data, createMode), this.<String>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
+                                        boolean createParent) {
+    return relayPath(delegate.create(namespace + path, data, createMode, createParent),
+                     this.<String>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path) {
+    return relayFuture(delegate.exists(namespace + path), this.<Stat>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
+    return relayFuture(delegate.exists(namespace + path, watcher), this.<Stat>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path) {
+    return relayFuture(delegate.getChildren(namespace + path), this.<NodeChildren>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
+    return relayFuture(delegate.getChildren(namespace + path, watcher), this.<NodeChildren>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path) {
+    return relayFuture(delegate.getData(namespace + path), this.<NodeData>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
+    return relayFuture(delegate.getData(namespace + path, watcher), this.<NodeData>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String path, byte[] data) {
+    return relayFuture(delegate.setData(namespace + path, data), this.<Stat>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+    return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
+  }
+
+  @Override
+  public OperationFuture<String> delete(String path) {
+    return relayPath(delegate.delete(namespace + path), this.<String>createFuture(path));
+  }
+
+  @Override
+  public OperationFuture<String> delete(String deletePath, int version) {
+    return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
+  }
+
+  private <V> SettableOperationFuture<V> createFuture(String path) {
+    return SettableOperationFuture.create(namespace + path, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  private <V> OperationFuture<V> relayFuture(final OperationFuture<V> from, final SettableOperationFuture<V> to) {
+    Futures.addCallback(from, new FutureCallback<V>() {
+      @Override
+      public void onSuccess(V result) {
+        to.set(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        to.setException(t);
+      }
+    });
+    return to;
+  }
+
+  private OperationFuture<String> relayPath(final OperationFuture<String> from,
+                                            final SettableOperationFuture<String> to) {
+    from.addListener(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          String path = from.get();
+          to.set(path.substring(namespace.length()));
+        } catch (Exception e) {
+          to.setException(e.getCause());
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+    return to;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
new file mode 100644
index 0000000..fb42491
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility class for help determining operation retry condition.
+ */
+final class RetryUtils {
+
+  /**
+   * Tells if a given operation error code can be retried or not.
+   * @param code The error code of the operation.
+   * @return {@code true} if the operation can be retried.
+   */
+  public static boolean canRetry(KeeperException.Code code) {
+    return (code == KeeperException.Code.CONNECTIONLOSS
+          || code == KeeperException.Code.OPERATIONTIMEOUT
+          || code == KeeperException.Code.SESSIONEXPIRED
+          || code == KeeperException.Code.SESSIONMOVED);
+  }
+
+  /**
+   * Tells if a given operation exception can be retried or not.
+   * @param t The exception raised by an operation.
+   * @return {@code true} if the operation can be retried.
+   */
+  public static boolean canRetry(Throwable t) {
+    return t instanceof KeeperException && canRetry(((KeeperException) t).code());
+  }
+
+  private RetryUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java
new file mode 100644
index 0000000..181ca2b
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicMarkableReference;
+
+/**
+ * A wrapper for {@link Watcher} that will re-set the watch automatically until it is successful.
+ */
+final class RewatchOnExpireWatcher implements Watcher {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewatchOnExpireWatcher.class);
+
+  enum ActionType {
+    EXISTS,
+    CHILDREN,
+    DATA
+  }
+
+  private final ZKClient client;
+  private final ActionType actionType;
+  private final String path;
+  private final Watcher delegate;
+  private final AtomicMarkableReference<Object> lastResult;
+
+  RewatchOnExpireWatcher(ZKClient client, ActionType actionType, String path, Watcher delegate) {
+    this.client = client;
+    this.actionType = actionType;
+    this.path = path;
+    this.delegate = delegate;
+    this.lastResult = new AtomicMarkableReference<Object>(null, false);
+  }
+
+  /**
+   * Sets the result from the operation that causes this watcher to be set.
+   */
+  void setLastResult(Object result) {
+    lastResult.compareAndSet(null, result, false, true);
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    if (delegate != null && event.getType() != Event.EventType.None) {
+      try {
+        delegate.process(event);
+      } catch (Throwable t) {
+        LOG.error("Watcher throws exception.", t);
+      }
+    }
+
+    if (event.getState() != Event.KeeperState.Expired) {
+      return;
+    }
+    switch (actionType) {
+      case EXISTS:
+        exists();
+        break;
+      case CHILDREN:
+        children();
+        break;
+      case DATA:
+        data();
+        break;
+    }
+  }
+
+  private void exists() {
+    Futures.addCallback(client.exists(path, this), new FutureCallback<Stat>() {
+      @Override
+      public void onSuccess(Stat stat) {
+        // Since we know all callbacks and watcher are triggered from single event thread, there is no race condition.
+        Object oldResult = lastResult.getReference();
+        lastResult.compareAndSet(oldResult, null, true, false);
+
+        if (stat != oldResult && (stat == null || !stat.equals(oldResult))) {
+          if (stat == null) {
+            // previous stat is not null, means node deleted
+            process(new WatchedEvent(Event.EventType.NodeDeleted, Event.KeeperState.SyncConnected, path));
+          } else if (oldResult == null) {
+            // previous stat is null, means node created
+            process(new WatchedEvent(Event.EventType.NodeCreated, Event.KeeperState.SyncConnected, path));
+          } else {
+            // Otherwise, something changed on the node
+            process(new WatchedEvent(Event.EventType.NodeDataChanged, Event.KeeperState.SyncConnected, path));
+          }
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        if (RetryUtils.canRetry(t)) {
+          exists();
+        } else {
+          lastResult.set(null, false);
+          LOG.error("Fail to re-set watch on exists for path " + path, t);
+        }
+      }
+    });
+  }
+
+  private void children() {
+    Futures.addCallback(client.getChildren(path, this), new FutureCallback<NodeChildren>() {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        Object oldResult = lastResult.getReference();
+        lastResult.compareAndSet(oldResult, null, true, false);
+
+        if (result.equals(oldResult)) {
+          return;
+        }
+
+        if (!(oldResult instanceof NodeChildren)) {
+          // Something very wrong
+          LOG.error("The same watcher has been used for different event type.");
+          return;
+        }
+
+        NodeChildren oldNodeChildren = (NodeChildren) oldResult;
+        if (!result.getChildren().equals(oldNodeChildren.getChildren())) {
+          process(new WatchedEvent(Event.EventType.NodeChildrenChanged, Event.KeeperState.SyncConnected, path));
+        } else {
+          process(new WatchedEvent(Event.EventType.NodeDataChanged, Event.KeeperState.SyncConnected, path));
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        if (RetryUtils.canRetry(t)) {
+          children();
+          return;
+        }
+
+        lastResult.set(null, false);
+        if (t instanceof KeeperException) {
+          KeeperException.Code code = ((KeeperException) t).code();
+          if (code == KeeperException.Code.NONODE) {
+            // Node deleted
+            process(new WatchedEvent(Event.EventType.NodeDeleted, Event.KeeperState.SyncConnected, path));
+            return;
+          }
+        }
+        LOG.error("Fail to re-set watch on getChildren for path " + path, t);
+      }
+    });
+  }
+
+  private void data() {
+    Futures.addCallback(client.getData(path, this), new FutureCallback<NodeData>() {
+      @Override
+      public void onSuccess(NodeData result) {
+        Object oldResult = lastResult.getReference();
+        lastResult.compareAndSet(oldResult, null, true, false);
+
+        if (!result.equals(oldResult)) {
+          // Whenever something changed, treated it as data changed.
+          process(new WatchedEvent(Event.EventType.NodeDataChanged, Event.KeeperState.SyncConnected, path));
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        if (RetryUtils.canRetry(t)) {
+          data();
+          return;
+        }
+
+        lastResult.set(null, false);
+        if (t instanceof KeeperException) {
+          KeeperException.Code code = ((KeeperException) t).code();
+          if (code == KeeperException.Code.NONODE) {
+            // Node deleted
+            process(new WatchedEvent(Event.EventType.NodeDeleted, Event.KeeperState.SyncConnected, path));
+            return;
+          }
+        }
+        LOG.error("Fail to re-set watch on getData for path " + path, t);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
new file mode 100644
index 0000000..402c153
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.internal.zookeeper.RewatchOnExpireWatcher.ActionType;
+import org.apache.twill.zookeeper.ForwardingZKClient;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * A {@link ZKClient} that will rewatch automatically when session expired and reconnect.
+ * The rewatch logic is mainly done in {@link RewatchOnExpireWatcher}.
+ */
+public final class RewatchOnExpireZKClient extends ForwardingZKClient {
+
+  public RewatchOnExpireZKClient(ZKClient delegate) {
+    super(delegate);
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path, Watcher watcher) {
+    final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.EXISTS, path, watcher);
+    OperationFuture<Stat> result = super.exists(path, wrappedWatcher);
+    Futures.addCallback(result, new FutureCallback<Stat>() {
+      @Override
+      public void onSuccess(Stat result) {
+        wrappedWatcher.setLastResult(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // No-op
+      }
+    });
+    return result;
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
+    final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.CHILDREN, path, watcher);
+    OperationFuture<NodeChildren> result = super.getChildren(path, wrappedWatcher);
+    Futures.addCallback(result, new FutureCallback<NodeChildren>() {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        wrappedWatcher.setLastResult(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // No-op
+      }
+    });
+    return result;
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path, Watcher watcher) {
+    final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.DATA, path, watcher);
+    OperationFuture<NodeData> result = super.getData(path, wrappedWatcher);
+    Futures.addCallback(result, new FutureCallback<NodeData>() {
+      @Override
+      public void onSuccess(NodeData result) {
+        wrappedWatcher.setLastResult(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // No-op
+      }
+    });
+    return result;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
new file mode 100644
index 0000000..7544e56
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.zookeeper.OperationFuture;
+import com.google.common.util.concurrent.AbstractFuture;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
+/**
+ * An implementation for {@link OperationFuture} that allows setting result directly.
+ * Also, all listener callback will be fired from the given executor.
+ */
+public final class SettableOperationFuture<V> extends AbstractFuture<V> implements OperationFuture<V> {
+
+  private final String requestPath;
+  private final Executor executor;
+
+  public static <V> SettableOperationFuture<V> create(String path, Executor executor) {
+    return new SettableOperationFuture<V>(path, executor);
+  }
+
+  private SettableOperationFuture(String requestPath, Executor executor) {
+    this.requestPath = requestPath;
+    this.executor = executor;
+  }
+
+  @Override
+  public String getRequestPath() {
+    return requestPath;
+  }
+
+  @Override
+  public void addListener(final Runnable listener, final Executor exec) {
+    super.addListener(new Runnable() {
+      @Override
+      public void run() {
+        exec.execute(listener);
+      }
+    }, executor);
+  }
+
+  @Override
+  public boolean setException(Throwable throwable) {
+    return super.setException(throwable);
+  }
+
+  @Override
+  public boolean set(@Nullable V value) {
+    return super.set(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/package-info.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/package-info.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/package-info.java
new file mode 100644
index 0000000..d2afa11
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal classes for zookeeper.
+ */
+package org.apache.twill.internal.zookeeper;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
new file mode 100644
index 0000000..3f3003d
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import javax.annotation.Nullable;
+
+/**
+ *
+ */
+public abstract class ForwardingZKClient implements ZKClient {
+
+  private final ZKClient delegate;
+
+  protected ForwardingZKClient(ZKClient delegate) {
+    this.delegate = delegate;
+  }
+
+  public final ZKClient getDelegate() {
+    return delegate;
+  }
+
+  @Override
+  public Long getSessionId() {
+    return delegate.getSessionId();
+  }
+
+  @Override
+  public String getConnectString() {
+    return delegate.getConnectString();
+  }
+
+  @Override
+  public void addConnectionWatcher(Watcher watcher) {
+    delegate.addConnectionWatcher(watcher);
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
+    return create(path, data, createMode, true);
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
+                                        boolean createParent) {
+    return delegate.create(path, data, createMode, createParent);
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path) {
+    return exists(path, null);
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
+    return delegate.exists(path, watcher);
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path) {
+    return getChildren(path, null);
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
+    return delegate.getChildren(path, watcher);
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path) {
+    return getData(path, null);
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
+    return delegate.getData(path, watcher);
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String path, byte[] data) {
+    return setData(path, data, -1);
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+    return delegate.setData(dataPath, data, version);
+  }
+
+  @Override
+  public OperationFuture<String> delete(String path) {
+    return delete(path, -1);
+  }
+
+  @Override
+  public OperationFuture<String> delete(String deletePath, int version) {
+    return delegate.delete(deletePath, version);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java
new file mode 100644
index 0000000..10391b2
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.zookeeper.ZooKeeper;
+
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public abstract class ForwardingZKClientService extends ForwardingZKClient implements ZKClientService {
+
+  private final ZKClientService delegate;
+
+  protected ForwardingZKClientService(ZKClientService delegate) {
+    super(delegate);
+    this.delegate = delegate;
+  }
+
+  @Override
+  public Supplier<ZooKeeper> getZooKeeperSupplier() {
+    return delegate.getZooKeeperSupplier();
+  }
+
+  @Override
+  public ListenableFuture<State> start() {
+    return delegate.start();
+  }
+
+  @Override
+  public State startAndWait() {
+    return Futures.getUnchecked(start());
+  }
+
+  @Override
+  public boolean isRunning() {
+    return delegate.isRunning();
+  }
+
+  @Override
+  public State state() {
+    return delegate.state();
+  }
+
+  @Override
+  public ListenableFuture<State> stop() {
+    return delegate.stop();
+  }
+
+  @Override
+  public State stopAndWait() {
+    return Futures.getUnchecked(stop());
+  }
+
+  @Override
+  public void addListener(Listener listener, Executor executor) {
+    delegate.addListener(listener, executor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeChildren.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeChildren.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeChildren.java
new file mode 100644
index 0000000..b432c01
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeChildren.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * Represents result of call to {@link ZKClientService#getChildren(String, org.apache.zookeeper.Watcher)} method.
+ */
+public interface NodeChildren {
+
+  /**
+   * @return The {@link Stat} of the node.
+   */
+  Stat getStat();
+
+  /**
+   * @return List of children node names.
+   */
+  List<String> getChildren();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
new file mode 100644
index 0000000..ac15957
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.data.Stat;
+
+import javax.annotation.Nullable;
+
+/**
+ * Represents result of call to {@link ZKClientService#getData(String, org.apache.zookeeper.Watcher)}.
+ */
+public interface NodeData {
+
+  /**
+   * @return The {@link Stat} of the node.
+   */
+  Stat getStat();
+
+  /**
+   * @return Data stored in the node, or {@code null} if there is no data.
+   */
+  @Nullable
+  byte[] getData();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/OperationFuture.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/OperationFuture.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/OperationFuture.java
new file mode 100644
index 0000000..fafaa7a
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/OperationFuture.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A {@link ListenableFuture} that also provides the requested path for a operation.
+ *
+ * @param <V> The result type returned by this Future's {@link #get()} method.
+ */
+public interface OperationFuture<V> extends ListenableFuture<V> {
+
+  /**
+   * @return The path being requested for the ZooKeeper operation.
+   */
+  String getRequestPath();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategies.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategies.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategies.java
new file mode 100644
index 0000000..56474b7
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategies.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Factory for creating common {@link RetryStrategy} implementation.
+ */
+public final class RetryStrategies {
+
+  /**
+   * @return A {@link RetryStrategy} that doesn't do any retry.
+   */
+  public static RetryStrategy noRetry() {
+    return new RetryStrategy() {
+      @Override
+      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
+        return -1;
+      }
+    };
+  }
+
+  /**
+   * Creates a {@link RetryStrategy} that retries maximum given number of times, with the actual
+   * delay behavior delegated to another {@link RetryStrategy}.
+   * @param limit Maximum number of retries allowed.
+   * @param strategy When failure count is less than or equal to the limit, this strategy will be called.
+   * @return A {@link RetryStrategy}.
+   */
+  public static RetryStrategy limit(final int limit, final RetryStrategy strategy) {
+    Preconditions.checkArgument(limit >= 0, "limit must be >= 0");
+    return new RetryStrategy() {
+      @Override
+      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
+        return (failureCount <= limit) ? strategy.nextRetry(failureCount, startTime, type, path) : -1L;
+      }
+    };
+  }
+
+  /**
+   * Creates a {@link RetryStrategy} that imposes a fix delay between each retries.
+   * @param delay delay time
+   * @param delayUnit {@link TimeUnit} for the delay.
+   * @return A {@link RetryStrategy}.
+   */
+  public static RetryStrategy fixDelay(final long delay, final TimeUnit delayUnit) {
+    Preconditions.checkArgument(delay >= 0, "delay must be >= 0");
+    return new RetryStrategy() {
+      @Override
+      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
+        return TimeUnit.MILLISECONDS.convert(delay, delayUnit);
+      }
+    };
+  }
+
+  /**
+   * Creates a {@link RetryStrategy} that will increase delay exponentially between each retries.
+   * @param baseDelay delay to start with.
+   * @param maxDelay cap of the delay.
+   * @param delayUnit {@link TimeUnit} for the delays.
+   * @return A {@link RetryStrategy}.
+   */
+  public static RetryStrategy exponentialDelay(final long baseDelay, final long maxDelay, final TimeUnit delayUnit) {
+    Preconditions.checkArgument(baseDelay >= 0, "base delay must be >= 0");
+    Preconditions.checkArgument(maxDelay >= 0, "max delay must be >= 0");
+    return new RetryStrategy() {
+      @Override
+      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
+        long power = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1));
+        long delay = Math.min(baseDelay * power, maxDelay);
+        delay = delay < 0 ? maxDelay : delay;
+        return TimeUnit.MILLISECONDS.convert(delay, delayUnit);
+      }
+    };
+  }
+
+  /**
+   * Creates a {@link RetryStrategy} that will retry until maximum amount of time has been passed since the request,
+   * with the actual delay behavior delegated to another {@link RetryStrategy}.
+   * @param maxElapseTime Maximum amount of time until giving up retry.
+   * @param timeUnit {@link TimeUnit} for the max elapse time.
+   * @param strategy When time elapsed is less than or equal to the limit, this strategy will be called.
+   * @return A {@link RetryStrategy}.
+   */
+  public static RetryStrategy timeLimit(long maxElapseTime, TimeUnit timeUnit, final RetryStrategy strategy) {
+    Preconditions.checkArgument(maxElapseTime >= 0, "max elapse time must be >= 0");
+    final long maxElapseMs = TimeUnit.MILLISECONDS.convert(maxElapseTime, timeUnit);
+    return new RetryStrategy() {
+      @Override
+      public long nextRetry(int failureCount, long startTime, OperationType type, String path) {
+        long elapseTime = System.currentTimeMillis() - startTime;
+        return elapseTime <= maxElapseMs ? strategy.nextRetry(failureCount, startTime, type, path) : -1L;
+      }
+    };
+  }
+
+  private RetryStrategies() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
new file mode 100644
index 0000000..3301e8a
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+/**
+ * Provides strategy to use for operation retries.
+ */
+public interface RetryStrategy {
+
+  /**
+   * Defines ZooKeeper operation type that triggers retry.
+   */
+  enum OperationType {
+    CREATE,
+    EXISTS,
+    GET_CHILDREN,
+    GET_DATA,
+    SET_DATA,
+    DELETE
+  }
+
+  /**
+   * Returns the number of milliseconds to wait before retrying the operation.
+   *
+   * @param failureCount Number of times that the request has been failed.
+   * @param startTime Timestamp in milliseconds that the request starts.
+   * @param type Type of operation tried to perform.
+   * @param path The path that the operation is acting on.
+   * @return Number of milliseconds to wait before retrying the operation. Returning {@code 0} means
+   *         retry it immediately, while negative means abort the operation.
+   */
+  long nextRetry(int failureCount, long startTime, OperationType type, String path);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
new file mode 100644
index 0000000..d60182e
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import javax.annotation.Nullable;
+
+/**
+ * A ZooKeeper client that provides asynchronous zookeeper operations.
+ */
+public interface ZKClient {
+
+  /**
+   * Returns the current Zookeeper session ID of this client.
+   * If this ZKClient is not connected, {@code null} is returned.
+   */
+  Long getSessionId();
+
+  /**
+   * Returns the connection string used for connecting to Zookeeper.
+   */
+  String getConnectString();
+
+  /**
+   * Adds a {@link Watcher} that will be called whenever connection state change.
+   * @param watcher The watcher to set.
+   */
+  void addConnectionWatcher(Watcher watcher);
+
+  /**
+   * Same as calling
+   * {@link #create(String, byte[], org.apache.zookeeper.CreateMode, boolean) create(path, data, createMode, true)}.
+   *
+   * @see #create(String, byte[], org.apache.zookeeper.CreateMode, boolean)
+   */
+  OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode);
+
+  /**
+   * Creates a path in zookeeper, with given data and create mode.
+   *
+   * @param path Path to be created
+   * @param data Data to be stored in the node, or {@code null} if no data to store.
+   * @param createMode The {@link org.apache.zookeeper.CreateMode} for the node.
+   * @param createParent If {@code true} and parent nodes are missing, it will create all parent nodes as normal
+   *                     persistent node before creating the request node.
+   * @return A {@link OperationFuture} that will be completed when the
+   *         creation is done. If there is error during creation, it will be reflected as error in the future.
+   */
+  OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent);
+
+  /**
+   * Checks if the path exists. Same as calling
+   * {@link #exists(String, org.apache.zookeeper.Watcher) exists(path, null)}.
+   *
+   * @see #exists(String, org.apache.zookeeper.Watcher)
+   */
+  OperationFuture<Stat> exists(String path);
+
+  /**
+   * Checks if the given path exists and leave a watcher on the node for watching creation/deletion/data changes
+   * on the node.
+   *
+   * @param path The path to check for existence.
+   * @param watcher Watcher for watching changes, or {@code null} if no watcher to set.
+   * @return A {@link OperationFuture} that will be completed when the exists check is done. If the path
+   *         does exists, the node {@link Stat} is set into the future. If the path doesn't exists,
+   *         a {@code null} value is set into the future.
+   */
+  OperationFuture<Stat> exists(String path, @Nullable Watcher watcher);
+
+  /**
+   * Gets the list of children nodes under the given path. Same as calling
+   * {@link #getChildren(String, org.apache.zookeeper.Watcher) getChildren(path, null)}.
+   *
+   * @see #getChildren(String, org.apache.zookeeper.Watcher)
+   */
+  OperationFuture<NodeChildren> getChildren(String path);
+
+  /**
+   * Gets the list of children nodes under the given path and leave a watcher on the node for watching node
+   * deletion and children nodes creation/deletion.
+   *
+   * @param path The path to fetch for children nodes
+   * @param watcher Watcher for watching changes, or {@code null} if no watcher to set.
+   * @return A {@link OperationFuture} that will be completed when the getChildren call is done, with the result
+   *         given as {@link NodeChildren}. If there is error, it will be reflected as error in the future.
+   */
+  OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher);
+
+  /**
+   * Gets the data stored in the given path. Same as calling
+   * {@link #getData(String, org.apache.zookeeper.Watcher) getData(path, null)}.
+   */
+  OperationFuture<NodeData> getData(String path);
+
+  /**
+   * Gets the data stored in the given path and leave a watcher on the node for watching deletion/data changes on
+   * the node.
+   *
+   * @param path The path to get data from.
+   * @param watcher Watcher for watching changes, or {@code null} if no watcher to set.
+   * @return A {@link OperationFuture} that will be completed when the getData call is done, with the result
+   *         given as {@link NodeData}. If there is error, it will be reflected as error in the future.
+   */
+  OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher);
+
+  /**
+   * Sets the data for the given path without matching version. Same as calling
+   * {@link #setData(String, byte[], int) setData(path, data, -1)}.
+   */
+  OperationFuture<Stat> setData(String path, byte[] data);
+
+  /**
+   * Sets the data for the given path that match the given version. If the version given is {@code -1}, it matches
+   * any version.
+   *
+   * @param dataPath The path to set data to.
+   * @param data Data to be set.
+   * @param version Matching version.
+   * @return A {@link OperationFuture} that will be completed when the setData call is done, with node {@link Stat}
+   *         given as the future result. If there is error, it will be reflected as error in the future.
+   */
+  OperationFuture<Stat> setData(String dataPath, byte[] data, int version);
+
+  /**
+   * Deletes the node of the given path without matching version. Same as calling
+   * {@link #delete(String, int) delete(path, -1)}.
+   *
+   * @see #delete(String, int)
+   */
+  OperationFuture<String> delete(String path);
+
+  /**
+   * Deletes the node of the given path that match the given version. If the version given is {@code -1}, it matches
+   * any version.
+   *
+   * @param deletePath The path to set data to.
+   * @param version Matching version.
+   * @return A {@link OperationFuture} that will be completed when the setData call is done, with node path
+   *         given as the future result. If there is error, it will be reflected as error in the future.
+   */
+  OperationFuture<String> delete(String deletePath, int version);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
new file mode 100644
index 0000000..63f27fb
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.twill.internal.zookeeper.DefaultZKClientService;
+import com.google.common.base.Supplier;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Service;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * A {@link ZKClient} that extends from {@link Service} to provide lifecycle management functions.
+ * The {@link #start()} method needed to be called before calling any other method on this interface.
+ * When the client is no longer needed, call {@link #stop()} to release any resources that it holds.
+ */
+public interface ZKClientService extends ZKClient, Service {
+
+  /**
+   * Returns a {@link Supplier} of {@link ZooKeeper} that gives the current {@link ZooKeeper} in use at the moment
+   * when {@link com.google.common.base.Supplier#get()} get called.
+   *
+   * @return A {@link Supplier Supplier&lt;ZooKeeper&gt;}
+   */
+  Supplier<ZooKeeper> getZooKeeperSupplier();
+
+  /**
+   * Builder for creating an implementation of {@link ZKClientService}.
+   * The default client timeout is 10000ms.
+   */
+  public static final class Builder {
+
+    private final String connectStr;
+    private int timeout = 10000;
+    private Watcher connectionWatcher;
+    private Multimap<String, ACL> acls = HashMultimap.create();
+
+    /**
+     * Creates a {@link Builder} with the given ZooKeeper connection string.
+     * @param connectStr The connection string.
+     * @return A new instance of Builder.
+     */
+    public static Builder of(String connectStr) {
+      return new Builder(connectStr);
+    }
+
+    /**
+     * Sets the client timeout to the give milliseconds.
+     * @param timeout timeout in milliseconds.
+     * @return This builder
+     */
+    public Builder setSessionTimeout(int timeout) {
+      this.timeout = timeout;
+      return this;
+    }
+
+    /**
+     * Sets a {@link Watcher} that will be called whenever connection state change.
+     * @param watcher The watcher to set.
+     * @return This builder.
+     */
+    public Builder setConnectionWatcher(Watcher watcher) {
+      this.connectionWatcher = watcher;
+      return this;
+    }
+
+    /**
+     * Creates an instance of {@link ZKClientService} with the settings of this builder.
+     * @return A new instance of {@link ZKClientService}.
+     */
+    public ZKClientService build() {
+      return new DefaultZKClientService(connectStr, timeout, connectionWatcher);
+    }
+
+    private Builder(String connectStr) {
+      this.connectStr = connectStr;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
new file mode 100644
index 0000000..cc38c76
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import javax.annotation.Nullable;
+
+/**
+ * Provides static factory method to create {@link ZKClientService} with modified behaviors.
+ */
+public final class ZKClientServices {
+
+  /**
+   * Creates a {@link ZKClientService} from the given {@link ZKClient} if the given {@link ZKClient} is an instance of
+   * {@link ZKClientService} or is a {@link ForwardingZKClient} that eventually trace back to a delegate of type
+   * {@link ZKClientService}. If such a {@link ZKClientService} instance is found, this method returns
+   * an instance by invoking {@link #delegate(ZKClient, ZKClientService)} with the given {@link ZKClient} and
+   * the {@link ZKClientService} found respectively.
+   *
+   * @param client The {@link ZKClient}.
+   * @return A {@link ZKClientService}.
+   * @throws IllegalArgumentException If no {@link ZKClientService} is found.
+   */
+  public static ZKClientService delegate(ZKClient client) {
+    ZKClient zkClient = client;
+    while (!(zkClient instanceof ZKClientService) && zkClient instanceof ForwardingZKClient) {
+      zkClient = ((ForwardingZKClient) zkClient).getDelegate();
+    }
+    if (zkClient instanceof ZKClientService) {
+      return delegate(client, (ZKClientService) zkClient);
+    }
+    throw new IllegalArgumentException("No ZKClientService found from the delegation hierarchy");
+  }
+
+  /**
+   * Creates a {@link ZKClientService} that for all {@link ZKClient} methods would be delegated to another
+   * {@link ZKClient}, while methods for {@link ZKClientService} would be delegated to another {@link ZKClientService},
+   * which the given {@link ZKClient} and {@link ZKClientService} could be different instances.
+   *
+   * @param client The {@link ZKClient} for delegation
+   * @param clientService The {@link ZKClientService} for delegation.
+   * @return A {@link ZKClientService}.
+   */
+  public static ZKClientService delegate(final ZKClient client, ZKClientService clientService) {
+    return new ForwardingZKClientService(clientService) {
+
+      @Override
+      public Long getSessionId() {
+        return client.getSessionId();
+      }
+
+      @Override
+      public String getConnectString() {
+        return client.getConnectString();
+      }
+
+      @Override
+      public void addConnectionWatcher(Watcher watcher) {
+        client.addConnectionWatcher(watcher);
+      }
+
+      @Override
+      public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
+        return client.create(path, data, createMode);
+      }
+
+      @Override
+      public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
+                                            boolean createParent) {
+        return client.create(path, data, createMode, createParent);
+      }
+
+      @Override
+      public OperationFuture<Stat> exists(String path) {
+        return client.exists(path);
+      }
+
+      @Override
+      public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
+        return client.exists(path, watcher);
+      }
+
+      @Override
+      public OperationFuture<NodeChildren> getChildren(String path) {
+        return client.getChildren(path);
+      }
+
+      @Override
+      public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
+        return client.getChildren(path, watcher);
+      }
+
+      @Override
+      public OperationFuture<NodeData> getData(String path) {
+        return client.getData(path);
+      }
+
+      @Override
+      public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
+        return client.getData(path, watcher);
+      }
+
+      @Override
+      public OperationFuture<Stat> setData(String path, byte[] data) {
+        return client.setData(path, data);
+      }
+
+      @Override
+      public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+        return client.setData(dataPath, data, version);
+      }
+
+      @Override
+      public OperationFuture<String> delete(String path) {
+        return client.delete(path);
+      }
+
+      @Override
+      public OperationFuture<String> delete(String deletePath, int version) {
+        return client.delete(deletePath, version);
+      }
+    };
+  }
+
+  private ZKClientServices() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClients.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClients.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClients.java
new file mode 100644
index 0000000..f67c1bd
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClients.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.twill.internal.zookeeper.FailureRetryZKClient;
+import org.apache.twill.internal.zookeeper.NamespaceZKClient;
+import org.apache.twill.internal.zookeeper.RewatchOnExpireZKClient;
+
+/**
+ *
+ */
+public final class ZKClients {
+
+  /**
+   * Creates a {@link ZKClient} that will perform auto re-watch on all existing watches
+   * when reconnection happens after session expiration. All {@link org.apache.zookeeper.Watcher Watchers}
+   * set through the returned {@link ZKClient} would not receive any connection events.
+   *
+   * @param client The {@link ZKClient} for operations delegation.
+   * @return A {@link ZKClient} that will do auto re-watch on all methods that accept a
+   *        {@link org.apache.zookeeper.Watcher} upon session expiration.
+   */
+  public static ZKClient reWatchOnExpire(ZKClient client) {
+    return new RewatchOnExpireZKClient(client);
+  }
+
+  /**
+   * Creates a {@link ZKClient} that will retry interim failure (e.g. connection loss, session expiration)
+   * based on the given {@link RetryStrategy}.
+   *
+   * @param client The {@link ZKClient} for operations delegation.
+   * @param retryStrategy The {@link RetryStrategy} to be invoke when there is operation failure.
+   * @return A {@link ZKClient}.
+   */
+  public static ZKClient retryOnFailure(ZKClient client, RetryStrategy retryStrategy) {
+    return new FailureRetryZKClient(client, retryStrategy);
+  }
+
+
+  public static ZKClient namespace(ZKClient zkClient, String namespace) {
+    return new NamespaceZKClient(zkClient, namespace);
+  }
+
+  private ZKClients() {
+  }
+}