You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/18 22:37:45 UTC

git commit: (TWILL-74) Added authentication and ACL support to ZKClient.

Repository: incubator-twill
Updated Branches:
  refs/heads/master 2c3cf3968 -> 9393df80b


(TWILL-74) Added authentication and ACL support to ZKClient.

* New method, addAuthInfo, added to ZkClientService.Builder for creating ZKClient with authentication.
* New methods (create, getACL, setACL) to support ACL are added to ZKClient.
* Refactor ZKClient hierarchy to have a AbstractZKClient to make sure correct behavior of method delegation for children classes.

Signed-off-by: Terence Yim <te...@continuuity.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/9393df80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/9393df80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/9393df80

Branch: refs/heads/master
Commit: 9393df80ba1c0b83a162c250c64ebd812d9db214
Parents: 2c3cf39
Author: Terence Yim <te...@continuuity.com>
Authored: Thu Apr 17 18:40:36 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Fri Apr 18 13:36:41 2014 -0700

----------------------------------------------------------------------
 .../twill/internal/zookeeper/BasicACLData.java  |  48 ++++++
 .../zookeeper/DefaultZKClientService.java       | 145 ++++++++++++-------
 .../zookeeper/FailureRetryZKClient.java         |  72 ++++-----
 .../internal/zookeeper/NamespaceZKClient.java   |  44 ++----
 .../zookeeper/RewatchOnExpireZKClient.java      |   6 +-
 .../org/apache/twill/zookeeper/ACLData.java     |  39 +++++
 .../twill/zookeeper/AbstractZKClient.java       |  80 ++++++++++
 .../twill/zookeeper/ForwardingZKClient.java     |  45 ++----
 .../org/apache/twill/zookeeper/NodeData.java    |   2 +-
 .../apache/twill/zookeeper/RetryStrategy.java   |   4 +-
 .../org/apache/twill/zookeeper/ZKClient.java    |  58 +++++++-
 .../apache/twill/zookeeper/ZKClientService.java |  19 ++-
 .../twill/zookeeper/ZKClientServices.java       |  46 ++----
 .../apache/twill/zookeeper/ZKClientTest.java    |  75 +++++++++-
 14 files changed, 480 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicACLData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicACLData.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicACLData.java
new file mode 100644
index 0000000..d31de8b
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicACLData.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * A straightforward implementation of {@link ACLData}.
+ */
+final class BasicACLData implements ACLData {
+
+  private final List<ACL> acl;
+  private final Stat stat;
+
+  BasicACLData(List<ACL> acl, Stat stat) {
+    this.acl = acl;
+    this.stat = stat;
+  }
+
+  @Override
+  public List<ACL> getACL() {
+    return acl;
+  }
+
+  @Override
+  public Stat getStat() {
+    return stat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 84b3a8d..7c9bd08 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -17,15 +17,20 @@
  */
 package org.apache.twill.internal.zookeeper;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.AbstractService;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
 import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.twill.zookeeper.AbstractZKClient;
 import org.apache.twill.zookeeper.NodeChildren;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.OperationFuture;
@@ -43,7 +48,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -56,33 +63,37 @@ import javax.annotation.Nullable;
 /**
  * The base implementation of {@link ZKClientService}.
  */
-public final class DefaultZKClientService implements ZKClientService {
+public final class DefaultZKClientService extends AbstractZKClient implements ZKClientService {
 
   private static final Logger LOG = LoggerFactory.getLogger(DefaultZKClientService.class);
 
   private final String zkStr;
   private final int sessionTimeout;
   private final List<Watcher> connectionWatchers;
+  private final Multimap<String, byte[]> authInfos;
   private final AtomicReference<ZooKeeper> zooKeeper;
-  private final Function<String, List<ACL>> aclMapper;
   private final Service serviceDelegate;
   private ExecutorService eventExecutor;
 
+  /**
+   * Creates a new instance.
+   * @deprecated Use {@link ZKClientService.Builder} instead.
+   */
+  @Deprecated
+  @SuppressWarnings("unused")
   public DefaultZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher) {
+    this(zkStr, sessionTimeout, connectionWatcher, ImmutableMultimap.<String, byte[]>of());
+  }
+
+  public DefaultZKClientService(String zkStr, int sessionTimeout,
+                                Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
     this.zkStr = zkStr;
     this.sessionTimeout = sessionTimeout;
     this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+    this.authInfos = copyAuthInfo(authInfos);
     addConnectionWatcher(connectionWatcher);
 
     this.zooKeeper = new AtomicReference<ZooKeeper>();
-
-    // TODO (terence): Add ACL
-    aclMapper = new Function<String, List<ACL>>() {
-      @Override
-      public List<ACL> apply(String input) {
-        return ZooDefs.Ids.OPEN_ACL_UNSAFE;
-      }
-    };
     serviceDelegate = new ServiceDelegate();
   }
 
@@ -105,23 +116,19 @@ public final class DefaultZKClientService implements ZKClientService {
   }
 
   @Override
-  public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
-    return create(path, data, createMode, true);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data,
-                                        CreateMode createMode, boolean createParent) {
-    return doCreate(path, data, createMode, createParent, false);
+  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
+                                        boolean createParent, Iterable<ACL> acl) {
+    return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false);
   }
 
   private OperationFuture<String> doCreate(final String path,
-                                        @Nullable final byte[] data,
-                                        final CreateMode createMode,
-                                        final boolean createParent,
-                                        final boolean ignoreNodeExists) {
+                                           @Nullable final byte[] data,
+                                           final CreateMode createMode,
+                                           final boolean createParent,
+                                           final List<ACL> acl,
+                                           final boolean ignoreNodeExists) {
     final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
-    getZooKeeper().create(path, data, aclMapper.apply(path), createMode, Callbacks.STRING, createFuture);
+    getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
     if (!createParent) {
       return createFuture;
     }
@@ -148,14 +155,14 @@ public final class DefaultZKClientService implements ZKClientService {
           result.setException(t);
           return;
         }
-        // Watch for parent creation complete
-        Futures.addCallback(
-          doCreate(parentPath, null, CreateMode.PERSISTENT, createParent, true), new FutureCallback<String>() {
+        // Watch for parent creation complete. Parent is created with the unsafe ACL.
+        Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT,
+                                     true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() {
           @Override
           public void onSuccess(String parentPath) {
             // Create the requested path again
             Futures.addCallback(
-              doCreate(path, data, createMode, false, ignoreNodeExists), new FutureCallback<String>() {
+              doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
               @Override
               public void onSuccess(String pathResult) {
                 result.set(pathResult);
@@ -219,11 +226,6 @@ public final class DefaultZKClientService implements ZKClientService {
   }
 
   @Override
-  public OperationFuture<Stat> exists(String path) {
-    return exists(path, null);
-  }
-
-  @Override
   public OperationFuture<Stat> exists(String path, Watcher watcher) {
     SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
     getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
@@ -231,11 +233,6 @@ public final class DefaultZKClientService implements ZKClientService {
   }
 
   @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return getChildren(path, null);
-  }
-
-  @Override
   public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
     SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
     getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
@@ -243,11 +240,6 @@ public final class DefaultZKClientService implements ZKClientService {
   }
 
   @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return getData(path, null);
-  }
-
-  @Override
   public OperationFuture<NodeData> getData(String path, Watcher watcher) {
     SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
     getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
@@ -256,11 +248,6 @@ public final class DefaultZKClientService implements ZKClientService {
   }
 
   @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return setData(path, data, -1);
-  }
-
-  @Override
   public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
     SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
     getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
@@ -268,11 +255,6 @@ public final class DefaultZKClientService implements ZKClientService {
   }
 
   @Override
-  public OperationFuture<String> delete(String path) {
-    return delete(path, -1);
-  }
-
-  @Override
   public OperationFuture<String> delete(String deletePath, int version) {
     SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
     getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
@@ -280,6 +262,20 @@ public final class DefaultZKClientService implements ZKClientService {
   }
 
   @Override
+  public OperationFuture<ACLData> getACL(String path) {
+    SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+    SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result);
+    return result;
+  }
+
+  @Override
   public Supplier<ZooKeeper> getZooKeeperSupplier() {
     return new Supplier<ZooKeeper>() {
       @Override
@@ -359,6 +355,20 @@ public final class DefaultZKClientService implements ZKClientService {
     };
   }
 
+  /**
+   * Creates a deep copy of the given authInfos multimap.
+   */
+  private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
+    Multimap<String, byte[]> result = ArrayListMultimap.create();
+
+    for (Map.Entry<String, byte[]> entry : authInfos.entries()) {
+      byte[] info = entry.getValue();
+      result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
+    }
+
+    return result;
+  }
+
   private final class ServiceDelegate extends AbstractService implements Watcher {
 
     @Override
@@ -374,7 +384,7 @@ public final class DefaultZKClientService implements ZKClientService {
       };
 
       try {
-        zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, this));
+        zooKeeper.set(createZooKeeper());
       } catch (IOException e) {
         notifyFailed(e);
       }
@@ -410,7 +420,7 @@ public final class DefaultZKClientService implements ZKClientService {
             @Override
             public void run() {
               try {
-                zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, ServiceDelegate.this));
+                zooKeeper.set(createZooKeeper());
               } catch (IOException e) {
                 zooKeeper.set(null);
                 notifyFailed(e);
@@ -428,6 +438,17 @@ public final class DefaultZKClientService implements ZKClientService {
         }
       }
     }
+
+    /**
+     * Creates a new ZooKeeper connection.
+     */
+    private ZooKeeper createZooKeeper() throws IOException {
+      ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, this);
+      for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
+        zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
+      }
+      return zk;
+    }
   }
 
   /**
@@ -521,5 +542,19 @@ public final class DefaultZKClientService implements ZKClientService {
         result.setException(KeeperException.create(code, result.getRequestPath()));
       }
     };
+
+    static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
+        SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(new BasicACLData(acl, stat));
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
index aa11730..73ee308 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
@@ -21,6 +21,7 @@ import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.ACLData;
 import org.apache.twill.zookeeper.ForwardingZKClient;
 import org.apache.twill.zookeeper.NodeChildren;
 import org.apache.twill.zookeeper.NodeData;
@@ -30,12 +31,14 @@ import org.apache.twill.zookeeper.RetryStrategy.OperationType;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
 
 /**
  * A {@link ZKClient} that will invoke {@link RetryStrategy} on operation failure.
@@ -56,37 +59,26 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
   }
 
   @Override
-  public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
-    return create(path, data, createMode, true);
-  }
-
-  @Override
-  public OperationFuture<String> create(final String path, final byte[] data,
-                                        final CreateMode createMode, final boolean createParent) {
-
+  public OperationFuture<String> create(final String path, @Nullable final byte[] data, final CreateMode createMode,
+                                        final boolean createParent, final Iterable<ACL> acl) {
     // No retry for any SEQUENTIAL node, as some algorithms depends on only one sequential node being created.
     if (createMode == CreateMode.PERSISTENT_SEQUENTIAL || createMode == CreateMode.EPHEMERAL_SEQUENTIAL) {
-      return super.create(path, data, createMode, createParent);
+      return super.create(path, data, createMode, createParent, acl);
     }
 
     final SettableOperationFuture<String> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
-    Futures.addCallback(super.create(path, data, createMode, createParent),
+    Futures.addCallback(super.create(path, data, createMode, createParent, acl),
                         new OperationFutureCallback<String>(OperationType.CREATE, System.currentTimeMillis(),
                                                             path, result, new Supplier<OperationFuture<String>>() {
                           @Override
                           public OperationFuture<String> get() {
-                            return FailureRetryZKClient.super.create(path, data, createMode, createParent);
+                            return FailureRetryZKClient.super.create(path, data, createMode, createParent, acl);
                           }
                         }));
     return result;
   }
 
   @Override
-  public OperationFuture<Stat> exists(String path) {
-    return exists(path, null);
-  }
-
-  @Override
   public OperationFuture<Stat> exists(final String path, final Watcher watcher) {
     final SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
     Futures.addCallback(super.exists(path, watcher),
@@ -101,11 +93,6 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
   }
 
   @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return getChildren(path, null);
-  }
-
-  @Override
   public OperationFuture<NodeChildren> getChildren(final String path, final Watcher watcher) {
     final SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path,
                                                                                         Threads.SAME_THREAD_EXECUTOR);
@@ -122,11 +109,6 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
   }
 
   @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return getData(path, null);
-  }
-
-  @Override
   public OperationFuture<NodeData> getData(final String path, final Watcher watcher) {
     final SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
     Futures.addCallback(super.getData(path, watcher),
@@ -141,11 +123,6 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
   }
 
   @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return setData(path, data, -1);
-  }
-
-  @Override
   public OperationFuture<Stat> setData(final String dataPath, final byte[] data, final int version) {
     final SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, Threads.SAME_THREAD_EXECUTOR);
     Futures.addCallback(super.setData(dataPath, data, version),
@@ -160,11 +137,6 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
   }
 
   @Override
-  public OperationFuture<String> delete(String path) {
-    return delete(path, -1);
-  }
-
-  @Override
   public OperationFuture<String> delete(final String deletePath, final int version) {
     final SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath,
                                                                                   Threads.SAME_THREAD_EXECUTOR);
@@ -180,6 +152,34 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
     return result;
   }
 
+  @Override
+  public OperationFuture<ACLData> getACL(final String path) {
+    final SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+    Futures.addCallback(super.getACL(path),
+                        new OperationFutureCallback<ACLData>(OperationType.GET_ACL, System.currentTimeMillis(),
+                                                          path, result, new Supplier<OperationFuture<ACLData>>() {
+                          @Override
+                          public OperationFuture<ACLData> get() {
+                            return FailureRetryZKClient.super.getACL(path);
+                          }
+                        }));
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> setACL(final String path, final Iterable<ACL> acl, final int version) {
+    final SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+    Futures.addCallback(super.setACL(path, acl, version),
+                        new OperationFutureCallback<Stat>(OperationType.SET_ACL, System.currentTimeMillis(),
+                                                          path, result, new Supplier<OperationFuture<Stat>>() {
+                          @Override
+                          public OperationFuture<Stat> get() {
+                            return FailureRetryZKClient.super.setACL(path, acl, version);
+                          }
+                        }));
+    return result;
+  }
+
   /**
    * Callback to watch for operation result and trigger retry if necessary.
    * @param <V> Type of operation result.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
index 7d3c268..beca5e8 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.zookeeper;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.ACLData;
 import org.apache.twill.zookeeper.ForwardingZKClient;
 import org.apache.twill.zookeeper.NodeChildren;
 import org.apache.twill.zookeeper.NodeData;
@@ -27,6 +28,7 @@ import org.apache.twill.zookeeper.OperationFuture;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 import javax.annotation.Nullable;
@@ -65,65 +67,45 @@ public final class NamespaceZKClient extends ForwardingZKClient {
   }
 
   @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
-    return relayPath(delegate.create(namespace + path, data, createMode), this.<String>createFuture(path));
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
-                                        boolean createParent) {
-    return relayPath(delegate.create(namespace + path, data, createMode, createParent),
+  public OperationFuture<String> create(String path, @Nullable byte[] data,
+                                        CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
+    return relayPath(delegate.create(namespace + path, data, createMode, createParent, acl),
                      this.<String>createFuture(path));
   }
 
   @Override
-  public OperationFuture<Stat> exists(String path) {
-    return relayFuture(delegate.exists(namespace + path), this.<Stat>createFuture(path));
-  }
-
-  @Override
   public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
     return relayFuture(delegate.exists(namespace + path, watcher), this.<Stat>createFuture(path));
   }
 
   @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return relayFuture(delegate.getChildren(namespace + path), this.<NodeChildren>createFuture(path));
-  }
-
-  @Override
   public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
     return relayFuture(delegate.getChildren(namespace + path, watcher), this.<NodeChildren>createFuture(path));
   }
 
   @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return relayFuture(delegate.getData(namespace + path), this.<NodeData>createFuture(path));
-  }
-
-  @Override
   public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
     return relayFuture(delegate.getData(namespace + path, watcher), this.<NodeData>createFuture(path));
   }
 
   @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return relayFuture(delegate.setData(namespace + path, data), this.<Stat>createFuture(path));
+  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+    return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
   }
 
   @Override
-  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
-    return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
+  public OperationFuture<String> delete(String deletePath, int version) {
+    return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
   }
 
   @Override
-  public OperationFuture<String> delete(String path) {
-    return relayPath(delegate.delete(namespace + path), this.<String>createFuture(path));
+  public OperationFuture<ACLData> getACL(String path) {
+    return relayFuture(delegate.getACL(namespace + path), this.<ACLData>createFuture(path));
   }
 
   @Override
-  public OperationFuture<String> delete(String deletePath, int version) {
-    return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
+  public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+    return relayFuture(delegate.setACL(namespace + path, acl, version), this.<Stat>createFuture(path));
   }
 
   private <V> SettableOperationFuture<V> createFuture(String path) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
index bd3bd2d..ed0e0bd 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
@@ -41,7 +41,7 @@ public final class RewatchOnExpireZKClient extends ForwardingZKClient {
   @Override
   public OperationFuture<Stat> exists(String path, Watcher watcher) {
     if (watcher == null) {
-      return super.exists(path, watcher);
+      return super.exists(path, null);
     }
     final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.EXISTS, path, watcher);
     OperationFuture<Stat> result = super.exists(path, wrappedWatcher);
@@ -62,7 +62,7 @@ public final class RewatchOnExpireZKClient extends ForwardingZKClient {
   @Override
   public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
     if (watcher == null) {
-      return super.getChildren(path, watcher);
+      return super.getChildren(path, null);
     }
     final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.CHILDREN, path, watcher);
     OperationFuture<NodeChildren> result = super.getChildren(path, wrappedWatcher);
@@ -83,7 +83,7 @@ public final class RewatchOnExpireZKClient extends ForwardingZKClient {
   @Override
   public OperationFuture<NodeData> getData(String path, Watcher watcher) {
     if (watcher == null) {
-      return super.getData(path, watcher);
+      return super.getData(path, null);
     }
     final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.DATA, path, watcher);
     OperationFuture<NodeData> result = super.getData(path, wrappedWatcher);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ACLData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ACLData.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ACLData.java
new file mode 100644
index 0000000..d3a6836
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ACLData.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * Represents result of call to {@link ZKClient#getACL(String)}.
+ */
+public interface ACLData {
+
+  /**
+   * @return list of {@link ACL} for the node.
+   */
+  List<ACL> getACL();
+
+  /**
+   * @return The {@link Stat} of the node.
+   */
+  Stat getStat();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/AbstractZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/AbstractZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/AbstractZKClient.java
new file mode 100644
index 0000000..208b536
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/AbstractZKClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import javax.annotation.Nullable;
+
+/**
+ * An abstract base implementation of {@link ZKClient} that simplifies implementation by providing forwarding for
+ * methods that are meant to be delegated to other methods.
+ */
+public abstract class AbstractZKClient implements ZKClient {
+
+  @Override
+  public final OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
+    return create(path, data, createMode, true);
+  }
+
+  @Override
+  public final OperationFuture<String> create(String path, @Nullable byte[] data,
+                                        CreateMode createMode, boolean createParent) {
+    return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  @Override
+  public final OperationFuture<String> create(String path, @Nullable byte[] data,
+                                              CreateMode createMode, Iterable<ACL> acl) {
+    return create(path, data, createMode, true, acl);
+  }
+
+  @Override
+  public final OperationFuture<Stat> exists(String path) {
+    return exists(path, null);
+  }
+
+  @Override
+  public final OperationFuture<NodeChildren> getChildren(String path) {
+    return getChildren(path, null);
+  }
+
+  @Override
+  public final OperationFuture<NodeData> getData(String path) {
+    return getData(path, null);
+  }
+
+  @Override
+  public final OperationFuture<Stat> setData(String path, byte[] data) {
+    return setData(path, data, -1);
+  }
+
+  @Override
+  public final OperationFuture<String> delete(String path) {
+    return delete(path, -1);
+  }
+
+  @Override
+  public final OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) {
+    return setACL(path, acl, -1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
index 3f3003d..149f7e7 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
@@ -19,6 +19,7 @@ package org.apache.twill.zookeeper;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 import javax.annotation.Nullable;
@@ -26,7 +27,7 @@ import javax.annotation.Nullable;
 /**
  *
  */
-public abstract class ForwardingZKClient implements ZKClient {
+public abstract class ForwardingZKClient extends AbstractZKClient {
 
   private final ZKClient delegate;
 
@@ -54,19 +55,9 @@ public abstract class ForwardingZKClient implements ZKClient {
   }
 
   @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
-    return create(path, data, createMode, true);
-  }
-
-  @Override
-  public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
-                                        boolean createParent) {
-    return delegate.create(path, data, createMode, createParent);
-  }
-
-  @Override
-  public OperationFuture<Stat> exists(String path) {
-    return exists(path, null);
+  public OperationFuture<String> create(String path, @Nullable byte[] data,
+                                        CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
+    return delegate.create(path, data, createMode, createParent, acl);
   }
 
   @Override
@@ -75,42 +66,32 @@ public abstract class ForwardingZKClient implements ZKClient {
   }
 
   @Override
-  public OperationFuture<NodeChildren> getChildren(String path) {
-    return getChildren(path, null);
-  }
-
-  @Override
   public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
     return delegate.getChildren(path, watcher);
   }
 
   @Override
-  public OperationFuture<NodeData> getData(String path) {
-    return getData(path, null);
-  }
-
-  @Override
   public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
     return delegate.getData(path, watcher);
   }
 
   @Override
-  public OperationFuture<Stat> setData(String path, byte[] data) {
-    return setData(path, data, -1);
+  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+    return delegate.setData(dataPath, data, version);
   }
 
   @Override
-  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
-    return delegate.setData(dataPath, data, version);
+  public OperationFuture<String> delete(String deletePath, int version) {
+    return delegate.delete(deletePath, version);
   }
 
   @Override
-  public OperationFuture<String> delete(String path) {
-    return delete(path, -1);
+  public OperationFuture<ACLData> getACL(String path) {
+    return delegate.getACL(path);
   }
 
   @Override
-  public OperationFuture<String> delete(String deletePath, int version) {
-    return delegate.delete(deletePath, version);
+  public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+    return delegate.setACL(path, acl, version);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
index ac15957..a9bd247 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
@@ -22,7 +22,7 @@ import org.apache.zookeeper.data.Stat;
 import javax.annotation.Nullable;
 
 /**
- * Represents result of call to {@link ZKClientService#getData(String, org.apache.zookeeper.Watcher)}.
+ * Represents result of call to {@link ZKClient#getData(String, org.apache.zookeeper.Watcher)}.
  */
 public interface NodeData {
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
index 3301e8a..fe174b2 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
@@ -31,7 +31,9 @@ public interface RetryStrategy {
     GET_CHILDREN,
     GET_DATA,
     SET_DATA,
-    DELETE
+    DELETE,
+    SET_ACL,
+    GET_ACL
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
index d60182e..8f8d6b2 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
@@ -19,6 +19,7 @@ package org.apache.twill.zookeeper;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 import javax.annotation.Nullable;
@@ -46,7 +47,7 @@ public interface ZKClient {
   void addConnectionWatcher(Watcher watcher);
 
   /**
-   * Same as calling
+   * Creates a path in zookeeper. Same as calling
    * {@link #create(String, byte[], org.apache.zookeeper.CreateMode, boolean) create(path, data, createMode, true)}.
    *
    * @see #create(String, byte[], org.apache.zookeeper.CreateMode, boolean)
@@ -54,17 +55,40 @@ public interface ZKClient {
   OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode);
 
   /**
+   * Creates a path in zookeeper. Same as calling
+   *
+   * {@link #create(String, byte[], org.apache.zookeeper.CreateMode, boolean, Iterable)
+   * create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE)}
+   *
+   * @see #create(String, byte[], org.apache.zookeeper.CreateMode, boolean, Iterable)
+   */
+  OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent);
+
+  /**
+   * Creates a path in zookeeper. Same as calling
+   *
+   * {@link #create(String, byte[], org.apache.zookeeper.CreateMode, boolean, Iterable)
+   * create(path, data, createMode, true, acl)}
+   *
+   * @see #create(String, byte[], org.apache.zookeeper.CreateMode, boolean, Iterable)
+   */
+  OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, Iterable<ACL> acl);
+
+  /**
    * Creates a path in zookeeper, with given data and create mode.
    *
    * @param path Path to be created
    * @param data Data to be stored in the node, or {@code null} if no data to store.
    * @param createMode The {@link org.apache.zookeeper.CreateMode} for the node.
    * @param createParent If {@code true} and parent nodes are missing, it will create all parent nodes as normal
-   *                     persistent node before creating the request node.
+   *                     persistent node with the ACL {@link org.apache.zookeeper.ZooDefs.Ids#OPEN_ACL_UNSAFE}
+   *                     before creating the request node.
+   * @param acl Set of {@link ACL} to be set for the node being created.
    * @return A {@link OperationFuture} that will be completed when the
    *         creation is done. If there is error during creation, it will be reflected as error in the future.
    */
-  OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent);
+  OperationFuture<String> create(String path, @Nullable byte[] data,
+                                 CreateMode createMode, boolean createParent, Iterable<ACL> acl);
 
   /**
    * Checks if the path exists. Same as calling
@@ -158,4 +182,32 @@ public interface ZKClient {
    *         given as the future result. If there is error, it will be reflected as error in the future.
    */
   OperationFuture<String> delete(String deletePath, int version);
+
+  /**
+   * Retrieves the Stat and ACL being set at the given path.
+   *
+   * @param path The path to get information from.
+   * @return A {@link OperationFuture} that will be completed when the getACL call is done, with the result given as
+   *         {@link ACLData}. If there is error, it will be reflected as error in the future.
+   */
+  OperationFuture<ACLData> getACL(String path);
+
+  /**
+   * Sets the ACL of the given path if the path exists. Same as calling
+   * {@link #setACL(String, Iterable, int) setACL(path, acl, -1)}
+   *
+   * @see #setACL(String, Iterable, int)
+   */
+  OperationFuture<Stat> setACL(String path, Iterable<ACL> acl);
+
+  /**
+   * Sets the ACL of the given path if the path exists and version matched.
+   *
+   * @param path The path to have ACL being set.
+   * @param acl ACL to set to.
+   * @param version Version of the node.
+   * @return A {@link OperationFuture} that will be completed when the setACL call is done, with the node {@link Stat}
+   *         available as the future result. If there is error, it will be reflected as error in the future.
+   */
+  OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
index 8d159af..33c23aa 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
@@ -18,13 +18,12 @@
 package org.apache.twill.zookeeper;
 
 import com.google.common.base.Supplier;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.Service;
 import org.apache.twill.internal.zookeeper.DefaultZKClientService;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
 
 /**
  * A {@link ZKClient} that extends from {@link Service} to provide lifecycle management functions.
@@ -50,7 +49,7 @@ public interface ZKClientService extends ZKClient, Service {
     private final String connectStr;
     private int timeout = 10000;
     private Watcher connectionWatcher;
-    private Multimap<String, ACL> acls = HashMultimap.create();
+    private Multimap<String, byte[]> auths = ArrayListMultimap.create();
 
     /**
      * Creates a {@link Builder} with the given ZooKeeper connection string.
@@ -82,11 +81,23 @@ public interface ZKClientService extends ZKClient, Service {
     }
 
     /**
+     * Adds an authorization information.
+     *
+     * @param schema The authorization schema.
+     * @param auth The authorization bytes.
+     * @return This builder.
+     */
+    public Builder addAuthInfo(String schema, byte[] auth) {
+      this.auths.put(schema, auth);
+      return this;
+    }
+
+    /**
      * Creates an instance of {@link ZKClientService} with the settings of this builder.
      * @return A new instance of {@link ZKClientService}.
      */
     public ZKClientService build() {
-      return new DefaultZKClientService(connectStr, timeout, connectionWatcher);
+      return new DefaultZKClientService(connectStr, timeout, connectionWatcher, auths);
     }
 
     private Builder(String connectStr) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
index cc38c76..7cdfd36 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
@@ -19,6 +19,7 @@ package org.apache.twill.zookeeper;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 import javax.annotation.Nullable;
@@ -77,20 +78,9 @@ public final class ZKClientServices {
         client.addConnectionWatcher(watcher);
       }
 
-      @Override
-      public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
-        return client.create(path, data, createMode);
-      }
-
-      @Override
-      public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
-                                            boolean createParent) {
-        return client.create(path, data, createMode, createParent);
-      }
-
-      @Override
-      public OperationFuture<Stat> exists(String path) {
-        return client.exists(path);
+      public OperationFuture<String> create(String path, @Nullable byte[] data,
+                                            CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
+        return client.create(path, data, createMode, createParent, acl);
       }
 
       @Override
@@ -99,44 +89,32 @@ public final class ZKClientServices {
       }
 
       @Override
-      public OperationFuture<NodeChildren> getChildren(String path) {
-        return client.getChildren(path);
-      }
-
-      @Override
       public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
         return client.getChildren(path, watcher);
       }
 
       @Override
-      public OperationFuture<NodeData> getData(String path) {
-        return client.getData(path);
-      }
-
-      @Override
       public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
         return client.getData(path, watcher);
       }
 
       @Override
-      public OperationFuture<Stat> setData(String path, byte[] data) {
-        return client.setData(path, data);
-      }
-
-      @Override
       public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
         return client.setData(dataPath, data, version);
       }
 
       @Override
-      public OperationFuture<String> delete(String path) {
-        return client.delete(path);
-      }
-
-      @Override
       public OperationFuture<String> delete(String deletePath, int version) {
         return client.delete(deletePath, version);
       }
+
+      public OperationFuture<ACLData> getACL(String path) {
+        return client.getACL(path);
+      }
+
+      public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+        return client.setACL(path, acl, version);
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 2228d46..40b0a39 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -20,19 +20,25 @@ package org.apache.twill.zookeeper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import com.google.common.io.Files;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.twill.internal.zookeeper.InMemoryZKServer;
 import org.apache.twill.internal.zookeeper.KillZKSession;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -48,6 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class ZKClientTest {
 
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
   @Test
   public void testChroot() throws Exception {
     InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
@@ -201,8 +210,8 @@ public class ZKClientTest {
   }
 
   @Test
-  public void testRetry() throws ExecutionException, InterruptedException, TimeoutException {
-    File dataDir = Files.createTempDir();
+  public void testRetry() throws ExecutionException, InterruptedException, TimeoutException, IOException {
+    File dataDir = tmpFolder.newFolder();
     InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(dataDir).setTickTime(1000).build();
     zkServer.startAndWait();
     int port = zkServer.getLocalAddress().getPort();
@@ -251,4 +260,64 @@ public class ZKClientTest {
       zkServer.stopAndWait();
     }
   }
+
+  @Test
+  public void testACL() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException {
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).setTickTime(1000).build();
+    zkServer.startAndWait();
+
+    try {
+      String userPass = "user:pass";
+      String digest = DigestAuthenticationProvider.generateDigest(userPass);
+
+      // Creates two zkclients
+      ZKClientService zkClient = ZKClientService.Builder
+                                                .of(zkServer.getConnectionStr())
+                                                .addAuthInfo("digest", userPass.getBytes())
+                                                .build();
+      zkClient.startAndWait();
+
+      ZKClientService noAuthClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      noAuthClient.startAndWait();
+
+
+      // Create a node that is readable by all client, but admin for the creator
+      String path = "/testacl";
+      zkClient.create(path, "test".getBytes(), CreateMode.PERSISTENT,
+                      ImmutableList.of(
+                        new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE),
+                        new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS)
+                      )).get();
+
+      // Verify the ACL
+      ACLData aclData = zkClient.getACL(path).get();
+      Assert.assertEquals(2, aclData.getACL().size());
+      ACL acl = aclData.getACL().get(1);
+      Assert.assertEquals(ZooDefs.Perms.ALL, acl.getPerms());
+      Assert.assertEquals("digest", acl.getId().getScheme());
+      Assert.assertEquals(digest, acl.getId().getId());
+
+      Assert.assertEquals("test", new String(noAuthClient.getData(path).get().getData()));
+
+      // When tries to write using the no-auth zk client, it should fail.
+      try {
+        noAuthClient.setData(path, "test2".getBytes()).get();
+        Assert.fail();
+      } catch (ExecutionException e) {
+        Assert.assertTrue(e.getCause() instanceof KeeperException.NoAuthException);
+      }
+
+      // Change ACL to make it open for all
+      zkClient.setACL(path, ImmutableList.of(new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.ANYONE_ID_UNSAFE))).get();
+
+      // Write again with the non-auth client, now should succeed.
+      noAuthClient.setData(path, "test2".getBytes()).get();
+
+      noAuthClient.stopAndWait();
+      zkClient.stopAndWait();
+
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
 }