You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/18 22:37:45 UTC
git commit: (TWILL-74) Added authentication and ACL support to
ZKClient.
Repository: incubator-twill
Updated Branches:
refs/heads/master 2c3cf3968 -> 9393df80b
(TWILL-74) Added authentication and ACL support to ZKClient.
* New method, addAuthInfo, added to ZkClientService.Builder for creating ZKClient with authentication.
* New methods (create, getACL, setACL) to support ACL are added to ZKClient.
* Refactor ZKClient hierarchy to have a AbstractZKClient to make sure correct behavior of method delegation for children classes.
Signed-off-by: Terence Yim <te...@continuuity.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/9393df80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/9393df80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/9393df80
Branch: refs/heads/master
Commit: 9393df80ba1c0b83a162c250c64ebd812d9db214
Parents: 2c3cf39
Author: Terence Yim <te...@continuuity.com>
Authored: Thu Apr 17 18:40:36 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Fri Apr 18 13:36:41 2014 -0700
----------------------------------------------------------------------
.../twill/internal/zookeeper/BasicACLData.java | 48 ++++++
.../zookeeper/DefaultZKClientService.java | 145 ++++++++++++-------
.../zookeeper/FailureRetryZKClient.java | 72 ++++-----
.../internal/zookeeper/NamespaceZKClient.java | 44 ++----
.../zookeeper/RewatchOnExpireZKClient.java | 6 +-
.../org/apache/twill/zookeeper/ACLData.java | 39 +++++
.../twill/zookeeper/AbstractZKClient.java | 80 ++++++++++
.../twill/zookeeper/ForwardingZKClient.java | 45 ++----
.../org/apache/twill/zookeeper/NodeData.java | 2 +-
.../apache/twill/zookeeper/RetryStrategy.java | 4 +-
.../org/apache/twill/zookeeper/ZKClient.java | 58 +++++++-
.../apache/twill/zookeeper/ZKClientService.java | 19 ++-
.../twill/zookeeper/ZKClientServices.java | 46 ++----
.../apache/twill/zookeeper/ZKClientTest.java | 75 +++++++++-
14 files changed, 480 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicACLData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicACLData.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicACLData.java
new file mode 100644
index 0000000..d31de8b
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicACLData.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * A straightforward implementation of {@link ACLData}.
+ */
+final class BasicACLData implements ACLData {
+
+ private final List<ACL> acl;
+ private final Stat stat;
+
+ BasicACLData(List<ACL> acl, Stat stat) {
+ this.acl = acl;
+ this.stat = stat;
+ }
+
+ @Override
+ public List<ACL> getACL() {
+ return acl;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 84b3a8d..7c9bd08 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -17,15 +17,20 @@
*/
package org.apache.twill.internal.zookeeper;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.twill.zookeeper.AbstractZKClient;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
@@ -43,7 +48,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -56,33 +63,37 @@ import javax.annotation.Nullable;
/**
* The base implementation of {@link ZKClientService}.
*/
-public final class DefaultZKClientService implements ZKClientService {
+public final class DefaultZKClientService extends AbstractZKClient implements ZKClientService {
private static final Logger LOG = LoggerFactory.getLogger(DefaultZKClientService.class);
private final String zkStr;
private final int sessionTimeout;
private final List<Watcher> connectionWatchers;
+ private final Multimap<String, byte[]> authInfos;
private final AtomicReference<ZooKeeper> zooKeeper;
- private final Function<String, List<ACL>> aclMapper;
private final Service serviceDelegate;
private ExecutorService eventExecutor;
+ /**
+ * Creates a new instance.
+ * @deprecated Use {@link ZKClientService.Builder} instead.
+ */
+ @Deprecated
+ @SuppressWarnings("unused")
public DefaultZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher) {
+ this(zkStr, sessionTimeout, connectionWatcher, ImmutableMultimap.<String, byte[]>of());
+ }
+
+ public DefaultZKClientService(String zkStr, int sessionTimeout,
+ Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
this.zkStr = zkStr;
this.sessionTimeout = sessionTimeout;
this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+ this.authInfos = copyAuthInfo(authInfos);
addConnectionWatcher(connectionWatcher);
this.zooKeeper = new AtomicReference<ZooKeeper>();
-
- // TODO (terence): Add ACL
- aclMapper = new Function<String, List<ACL>>() {
- @Override
- public List<ACL> apply(String input) {
- return ZooDefs.Ids.OPEN_ACL_UNSAFE;
- }
- };
serviceDelegate = new ServiceDelegate();
}
@@ -105,23 +116,19 @@ public final class DefaultZKClientService implements ZKClientService {
}
@Override
- public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
- return create(path, data, createMode, true);
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data,
- CreateMode createMode, boolean createParent) {
- return doCreate(path, data, createMode, createParent, false);
+ public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
+ boolean createParent, Iterable<ACL> acl) {
+ return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false);
}
private OperationFuture<String> doCreate(final String path,
- @Nullable final byte[] data,
- final CreateMode createMode,
- final boolean createParent,
- final boolean ignoreNodeExists) {
+ @Nullable final byte[] data,
+ final CreateMode createMode,
+ final boolean createParent,
+ final List<ACL> acl,
+ final boolean ignoreNodeExists) {
final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().create(path, data, aclMapper.apply(path), createMode, Callbacks.STRING, createFuture);
+ getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
if (!createParent) {
return createFuture;
}
@@ -148,14 +155,14 @@ public final class DefaultZKClientService implements ZKClientService {
result.setException(t);
return;
}
- // Watch for parent creation complete
- Futures.addCallback(
- doCreate(parentPath, null, CreateMode.PERSISTENT, createParent, true), new FutureCallback<String>() {
+ // Watch for parent creation complete. Parent is created with the unsafe ACL.
+ Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT,
+ true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() {
@Override
public void onSuccess(String parentPath) {
// Create the requested path again
Futures.addCallback(
- doCreate(path, data, createMode, false, ignoreNodeExists), new FutureCallback<String>() {
+ doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
@Override
public void onSuccess(String pathResult) {
result.set(pathResult);
@@ -219,11 +226,6 @@ public final class DefaultZKClientService implements ZKClientService {
}
@Override
- public OperationFuture<Stat> exists(String path) {
- return exists(path, null);
- }
-
- @Override
public OperationFuture<Stat> exists(String path, Watcher watcher) {
SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
@@ -231,11 +233,6 @@ public final class DefaultZKClientService implements ZKClientService {
}
@Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return getChildren(path, null);
- }
-
- @Override
public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
@@ -243,11 +240,6 @@ public final class DefaultZKClientService implements ZKClientService {
}
@Override
- public OperationFuture<NodeData> getData(String path) {
- return getData(path, null);
- }
-
- @Override
public OperationFuture<NodeData> getData(String path, Watcher watcher) {
SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
@@ -256,11 +248,6 @@ public final class DefaultZKClientService implements ZKClientService {
}
@Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return setData(path, data, -1);
- }
-
- @Override
public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
@@ -268,11 +255,6 @@ public final class DefaultZKClientService implements ZKClientService {
}
@Override
- public OperationFuture<String> delete(String path) {
- return delete(path, -1);
- }
-
- @Override
public OperationFuture<String> delete(String deletePath, int version) {
SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
@@ -280,6 +262,20 @@ public final class DefaultZKClientService implements ZKClientService {
}
@Override
+ public OperationFuture<ACLData> getACL(String path) {
+ SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result);
+ return result;
+ }
+
+ @Override
public Supplier<ZooKeeper> getZooKeeperSupplier() {
return new Supplier<ZooKeeper>() {
@Override
@@ -359,6 +355,20 @@ public final class DefaultZKClientService implements ZKClientService {
};
}
+ /**
+ * Creates a deep copy of the given authInfos multimap.
+ */
+ private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
+ Multimap<String, byte[]> result = ArrayListMultimap.create();
+
+ for (Map.Entry<String, byte[]> entry : authInfos.entries()) {
+ byte[] info = entry.getValue();
+ result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
+ }
+
+ return result;
+ }
+
private final class ServiceDelegate extends AbstractService implements Watcher {
@Override
@@ -374,7 +384,7 @@ public final class DefaultZKClientService implements ZKClientService {
};
try {
- zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, this));
+ zooKeeper.set(createZooKeeper());
} catch (IOException e) {
notifyFailed(e);
}
@@ -410,7 +420,7 @@ public final class DefaultZKClientService implements ZKClientService {
@Override
public void run() {
try {
- zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, ServiceDelegate.this));
+ zooKeeper.set(createZooKeeper());
} catch (IOException e) {
zooKeeper.set(null);
notifyFailed(e);
@@ -428,6 +438,17 @@ public final class DefaultZKClientService implements ZKClientService {
}
}
}
+
+ /**
+ * Creates a new ZooKeeper connection.
+ */
+ private ZooKeeper createZooKeeper() throws IOException {
+ ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, this);
+ for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
+ zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
+ }
+ return zk;
+ }
}
/**
@@ -521,5 +542,19 @@ public final class DefaultZKClientService implements ZKClientService {
result.setException(KeeperException.create(code, result.getRequestPath()));
}
};
+
+ static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
+ SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicACLData(acl, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
index aa11730..73ee308 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
@@ -21,6 +21,7 @@ import com.google.common.base.Supplier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.ACLData;
import org.apache.twill.zookeeper.ForwardingZKClient;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
@@ -30,12 +31,14 @@ import org.apache.twill.zookeeper.RetryStrategy.OperationType;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
/**
* A {@link ZKClient} that will invoke {@link RetryStrategy} on operation failure.
@@ -56,37 +59,26 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
}
@Override
- public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
- return create(path, data, createMode, true);
- }
-
- @Override
- public OperationFuture<String> create(final String path, final byte[] data,
- final CreateMode createMode, final boolean createParent) {
-
+ public OperationFuture<String> create(final String path, @Nullable final byte[] data, final CreateMode createMode,
+ final boolean createParent, final Iterable<ACL> acl) {
// No retry for any SEQUENTIAL node, as some algorithms depends on only one sequential node being created.
if (createMode == CreateMode.PERSISTENT_SEQUENTIAL || createMode == CreateMode.EPHEMERAL_SEQUENTIAL) {
- return super.create(path, data, createMode, createParent);
+ return super.create(path, data, createMode, createParent, acl);
}
final SettableOperationFuture<String> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
- Futures.addCallback(super.create(path, data, createMode, createParent),
+ Futures.addCallback(super.create(path, data, createMode, createParent, acl),
new OperationFutureCallback<String>(OperationType.CREATE, System.currentTimeMillis(),
path, result, new Supplier<OperationFuture<String>>() {
@Override
public OperationFuture<String> get() {
- return FailureRetryZKClient.super.create(path, data, createMode, createParent);
+ return FailureRetryZKClient.super.create(path, data, createMode, createParent, acl);
}
}));
return result;
}
@Override
- public OperationFuture<Stat> exists(String path) {
- return exists(path, null);
- }
-
- @Override
public OperationFuture<Stat> exists(final String path, final Watcher watcher) {
final SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
Futures.addCallback(super.exists(path, watcher),
@@ -101,11 +93,6 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
}
@Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return getChildren(path, null);
- }
-
- @Override
public OperationFuture<NodeChildren> getChildren(final String path, final Watcher watcher) {
final SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path,
Threads.SAME_THREAD_EXECUTOR);
@@ -122,11 +109,6 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
}
@Override
- public OperationFuture<NodeData> getData(String path) {
- return getData(path, null);
- }
-
- @Override
public OperationFuture<NodeData> getData(final String path, final Watcher watcher) {
final SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
Futures.addCallback(super.getData(path, watcher),
@@ -141,11 +123,6 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
}
@Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return setData(path, data, -1);
- }
-
- @Override
public OperationFuture<Stat> setData(final String dataPath, final byte[] data, final int version) {
final SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, Threads.SAME_THREAD_EXECUTOR);
Futures.addCallback(super.setData(dataPath, data, version),
@@ -160,11 +137,6 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
}
@Override
- public OperationFuture<String> delete(String path) {
- return delete(path, -1);
- }
-
- @Override
public OperationFuture<String> delete(final String deletePath, final int version) {
final SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath,
Threads.SAME_THREAD_EXECUTOR);
@@ -180,6 +152,34 @@ public final class FailureRetryZKClient extends ForwardingZKClient {
return result;
}
+ @Override
+ public OperationFuture<ACLData> getACL(final String path) {
+ final SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+ Futures.addCallback(super.getACL(path),
+ new OperationFutureCallback<ACLData>(OperationType.GET_ACL, System.currentTimeMillis(),
+ path, result, new Supplier<OperationFuture<ACLData>>() {
+ @Override
+ public OperationFuture<ACLData> get() {
+ return FailureRetryZKClient.super.getACL(path);
+ }
+ }));
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> setACL(final String path, final Iterable<ACL> acl, final int version) {
+ final SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
+ Futures.addCallback(super.setACL(path, acl, version),
+ new OperationFutureCallback<Stat>(OperationType.SET_ACL, System.currentTimeMillis(),
+ path, result, new Supplier<OperationFuture<Stat>>() {
+ @Override
+ public OperationFuture<Stat> get() {
+ return FailureRetryZKClient.super.setACL(path, acl, version);
+ }
+ }));
+ return result;
+ }
+
/**
* Callback to watch for operation result and trigger retry if necessary.
* @param <V> Type of operation result.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
index 7d3c268..beca5e8 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.zookeeper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.ACLData;
import org.apache.twill.zookeeper.ForwardingZKClient;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
@@ -27,6 +28,7 @@ import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import javax.annotation.Nullable;
@@ -65,65 +67,45 @@ public final class NamespaceZKClient extends ForwardingZKClient {
}
@Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
- return relayPath(delegate.create(namespace + path, data, createMode), this.<String>createFuture(path));
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
- boolean createParent) {
- return relayPath(delegate.create(namespace + path, data, createMode, createParent),
+ public OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
+ return relayPath(delegate.create(namespace + path, data, createMode, createParent, acl),
this.<String>createFuture(path));
}
@Override
- public OperationFuture<Stat> exists(String path) {
- return relayFuture(delegate.exists(namespace + path), this.<Stat>createFuture(path));
- }
-
- @Override
public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
return relayFuture(delegate.exists(namespace + path, watcher), this.<Stat>createFuture(path));
}
@Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return relayFuture(delegate.getChildren(namespace + path), this.<NodeChildren>createFuture(path));
- }
-
- @Override
public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
return relayFuture(delegate.getChildren(namespace + path, watcher), this.<NodeChildren>createFuture(path));
}
@Override
- public OperationFuture<NodeData> getData(String path) {
- return relayFuture(delegate.getData(namespace + path), this.<NodeData>createFuture(path));
- }
-
- @Override
public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
return relayFuture(delegate.getData(namespace + path, watcher), this.<NodeData>createFuture(path));
}
@Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return relayFuture(delegate.setData(namespace + path, data), this.<Stat>createFuture(path));
+ public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+ return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
}
@Override
- public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
- return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
+ public OperationFuture<String> delete(String deletePath, int version) {
+ return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
}
@Override
- public OperationFuture<String> delete(String path) {
- return relayPath(delegate.delete(namespace + path), this.<String>createFuture(path));
+ public OperationFuture<ACLData> getACL(String path) {
+ return relayFuture(delegate.getACL(namespace + path), this.<ACLData>createFuture(path));
}
@Override
- public OperationFuture<String> delete(String deletePath, int version) {
- return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
+ public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+ return relayFuture(delegate.setACL(namespace + path, acl, version), this.<Stat>createFuture(path));
}
private <V> SettableOperationFuture<V> createFuture(String path) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
index bd3bd2d..ed0e0bd 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java
@@ -41,7 +41,7 @@ public final class RewatchOnExpireZKClient extends ForwardingZKClient {
@Override
public OperationFuture<Stat> exists(String path, Watcher watcher) {
if (watcher == null) {
- return super.exists(path, watcher);
+ return super.exists(path, null);
}
final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.EXISTS, path, watcher);
OperationFuture<Stat> result = super.exists(path, wrappedWatcher);
@@ -62,7 +62,7 @@ public final class RewatchOnExpireZKClient extends ForwardingZKClient {
@Override
public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
if (watcher == null) {
- return super.getChildren(path, watcher);
+ return super.getChildren(path, null);
}
final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.CHILDREN, path, watcher);
OperationFuture<NodeChildren> result = super.getChildren(path, wrappedWatcher);
@@ -83,7 +83,7 @@ public final class RewatchOnExpireZKClient extends ForwardingZKClient {
@Override
public OperationFuture<NodeData> getData(String path, Watcher watcher) {
if (watcher == null) {
- return super.getData(path, watcher);
+ return super.getData(path, null);
}
final RewatchOnExpireWatcher wrappedWatcher = new RewatchOnExpireWatcher(this, ActionType.DATA, path, watcher);
OperationFuture<NodeData> result = super.getData(path, wrappedWatcher);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ACLData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ACLData.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ACLData.java
new file mode 100644
index 0000000..d3a6836
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ACLData.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * Represents result of call to {@link ZKClient#getACL(String)}.
+ */
+public interface ACLData {
+
+ /**
+ * @return list of {@link ACL} for the node.
+ */
+ List<ACL> getACL();
+
+ /**
+ * @return The {@link Stat} of the node.
+ */
+ Stat getStat();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/AbstractZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/AbstractZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/AbstractZKClient.java
new file mode 100644
index 0000000..208b536
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/AbstractZKClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.zookeeper;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import javax.annotation.Nullable;
+
+/**
+ * An abstract base implementation of {@link ZKClient} that simplifies implementation by providing forwarding for
+ * methods that are meant to be delegated to other methods.
+ */
+public abstract class AbstractZKClient implements ZKClient {
+
+ @Override
+ public final OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
+ return create(path, data, createMode, true);
+ }
+
+ @Override
+ public final OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent) {
+ return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ public final OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, Iterable<ACL> acl) {
+ return create(path, data, createMode, true, acl);
+ }
+
+ @Override
+ public final OperationFuture<Stat> exists(String path) {
+ return exists(path, null);
+ }
+
+ @Override
+ public final OperationFuture<NodeChildren> getChildren(String path) {
+ return getChildren(path, null);
+ }
+
+ @Override
+ public final OperationFuture<NodeData> getData(String path) {
+ return getData(path, null);
+ }
+
+ @Override
+ public final OperationFuture<Stat> setData(String path, byte[] data) {
+ return setData(path, data, -1);
+ }
+
+ @Override
+ public final OperationFuture<String> delete(String path) {
+ return delete(path, -1);
+ }
+
+ @Override
+ public final OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) {
+ return setACL(path, acl, -1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
index 3f3003d..149f7e7 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClient.java
@@ -19,6 +19,7 @@ package org.apache.twill.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import javax.annotation.Nullable;
@@ -26,7 +27,7 @@ import javax.annotation.Nullable;
/**
*
*/
-public abstract class ForwardingZKClient implements ZKClient {
+public abstract class ForwardingZKClient extends AbstractZKClient {
private final ZKClient delegate;
@@ -54,19 +55,9 @@ public abstract class ForwardingZKClient implements ZKClient {
}
@Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
- return create(path, data, createMode, true);
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
- boolean createParent) {
- return delegate.create(path, data, createMode, createParent);
- }
-
- @Override
- public OperationFuture<Stat> exists(String path) {
- return exists(path, null);
+ public OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
+ return delegate.create(path, data, createMode, createParent, acl);
}
@Override
@@ -75,42 +66,32 @@ public abstract class ForwardingZKClient implements ZKClient {
}
@Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return getChildren(path, null);
- }
-
- @Override
public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
return delegate.getChildren(path, watcher);
}
@Override
- public OperationFuture<NodeData> getData(String path) {
- return getData(path, null);
- }
-
- @Override
public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
return delegate.getData(path, watcher);
}
@Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return setData(path, data, -1);
+ public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+ return delegate.setData(dataPath, data, version);
}
@Override
- public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
- return delegate.setData(dataPath, data, version);
+ public OperationFuture<String> delete(String deletePath, int version) {
+ return delegate.delete(deletePath, version);
}
@Override
- public OperationFuture<String> delete(String path) {
- return delete(path, -1);
+ public OperationFuture<ACLData> getACL(String path) {
+ return delegate.getACL(path);
}
@Override
- public OperationFuture<String> delete(String deletePath, int version) {
- return delegate.delete(deletePath, version);
+ public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+ return delegate.setACL(path, acl, version);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
index ac15957..a9bd247 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/NodeData.java
@@ -22,7 +22,7 @@ import org.apache.zookeeper.data.Stat;
import javax.annotation.Nullable;
/**
- * Represents result of call to {@link ZKClientService#getData(String, org.apache.zookeeper.Watcher)}.
+ * Represents result of call to {@link ZKClient#getData(String, org.apache.zookeeper.Watcher)}.
*/
public interface NodeData {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
index 3301e8a..fe174b2 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/RetryStrategy.java
@@ -31,7 +31,9 @@ public interface RetryStrategy {
GET_CHILDREN,
GET_DATA,
SET_DATA,
- DELETE
+ DELETE,
+ SET_ACL,
+ GET_ACL
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
index d60182e..8f8d6b2 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClient.java
@@ -19,6 +19,7 @@ package org.apache.twill.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import javax.annotation.Nullable;
@@ -46,7 +47,7 @@ public interface ZKClient {
void addConnectionWatcher(Watcher watcher);
/**
- * Same as calling
+ * Creates a path in zookeeper. Same as calling
* {@link #create(String, byte[], org.apache.zookeeper.CreateMode, boolean) create(path, data, createMode, true)}.
*
* @see #create(String, byte[], org.apache.zookeeper.CreateMode, boolean)
@@ -54,17 +55,40 @@ public interface ZKClient {
OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode);
/**
+ * Creates a path in zookeeper. Same as calling
+ *
+ * {@link #create(String, byte[], org.apache.zookeeper.CreateMode, boolean, Iterable)
+ * create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE)}
+ *
+ * @see #create(String, byte[], org.apache.zookeeper.CreateMode, boolean, Iterable)
+ */
+ OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent);
+
+ /**
+ * Creates a path in zookeeper. Same as calling
+ *
+ * {@link #create(String, byte[], org.apache.zookeeper.CreateMode, boolean, Iterable)
+ * create(path, data, createMode, true, acl)}
+ *
+ * @see #create(String, byte[], org.apache.zookeeper.CreateMode, boolean, Iterable)
+ */
+ OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, Iterable<ACL> acl);
+
+ /**
* Creates a path in zookeeper, with given data and create mode.
*
* @param path Path to be created
* @param data Data to be stored in the node, or {@code null} if no data to store.
* @param createMode The {@link org.apache.zookeeper.CreateMode} for the node.
* @param createParent If {@code true} and parent nodes are missing, it will create all parent nodes as normal
- * persistent node before creating the request node.
+ * persistent node with the ACL {@link org.apache.zookeeper.ZooDefs.Ids#OPEN_ACL_UNSAFE}
+ * before creating the request node.
+ * @param acl Set of {@link ACL} to be set for the node being created.
* @return A {@link OperationFuture} that will be completed when the
* creation is done. If there is error during creation, it will be reflected as error in the future.
*/
- OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent);
+ OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent, Iterable<ACL> acl);
/**
* Checks if the path exists. Same as calling
@@ -158,4 +182,32 @@ public interface ZKClient {
* given as the future result. If there is error, it will be reflected as error in the future.
*/
OperationFuture<String> delete(String deletePath, int version);
+
+ /**
+ * Retrieves the Stat and ACL being set at the given path.
+ *
+ * @param path The path to get information from.
+ * @return A {@link OperationFuture} that will be completed when the getACL call is done, with the result given as
+ * {@link ACLData}. If there is error, it will be reflected as error in the future.
+ */
+ OperationFuture<ACLData> getACL(String path);
+
+ /**
+ * Sets the ACL of the given path if the path exists. Same as calling
+ * {@link #setACL(String, Iterable, int) setACL(path, acl, -1)}
+ *
+ * @see #setACL(String, Iterable, int)
+ */
+ OperationFuture<Stat> setACL(String path, Iterable<ACL> acl);
+
+ /**
+ * Sets the ACL of the given path if the path exists and version matched.
+ *
+ * @param path The path to have ACL being set.
+ * @param acl ACL to set to.
+ * @param version Version of the node.
+ * @return A {@link OperationFuture} that will be completed when the setACL call is done, with the node {@link Stat}
+ * available as the future result. If there is error, it will be reflected as error in the future.
+ */
+ OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
index 8d159af..33c23aa 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientService.java
@@ -18,13 +18,12 @@
package org.apache.twill.zookeeper;
import com.google.common.base.Supplier;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Service;
import org.apache.twill.internal.zookeeper.DefaultZKClientService;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
/**
* A {@link ZKClient} that extends from {@link Service} to provide lifecycle management functions.
@@ -50,7 +49,7 @@ public interface ZKClientService extends ZKClient, Service {
private final String connectStr;
private int timeout = 10000;
private Watcher connectionWatcher;
- private Multimap<String, ACL> acls = HashMultimap.create();
+ private Multimap<String, byte[]> auths = ArrayListMultimap.create();
/**
* Creates a {@link Builder} with the given ZooKeeper connection string.
@@ -82,11 +81,23 @@ public interface ZKClientService extends ZKClient, Service {
}
/**
+ * Adds an authorization information.
+ *
+ * @param schema The authorization schema.
+ * @param auth The authorization bytes.
+ * @return This builder.
+ */
+ public Builder addAuthInfo(String schema, byte[] auth) {
+ this.auths.put(schema, auth);
+ return this;
+ }
+
+ /**
* Creates an instance of {@link ZKClientService} with the settings of this builder.
* @return A new instance of {@link ZKClientService}.
*/
public ZKClientService build() {
- return new DefaultZKClientService(connectStr, timeout, connectionWatcher);
+ return new DefaultZKClientService(connectStr, timeout, connectionWatcher, auths);
}
private Builder(String connectStr) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
index cc38c76..7cdfd36 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKClientServices.java
@@ -19,6 +19,7 @@ package org.apache.twill.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import javax.annotation.Nullable;
@@ -77,20 +78,9 @@ public final class ZKClientServices {
client.addConnectionWatcher(watcher);
}
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
- return client.create(path, data, createMode);
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
- boolean createParent) {
- return client.create(path, data, createMode, createParent);
- }
-
- @Override
- public OperationFuture<Stat> exists(String path) {
- return client.exists(path);
+ public OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
+ return client.create(path, data, createMode, createParent, acl);
}
@Override
@@ -99,44 +89,32 @@ public final class ZKClientServices {
}
@Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return client.getChildren(path);
- }
-
- @Override
public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
return client.getChildren(path, watcher);
}
@Override
- public OperationFuture<NodeData> getData(String path) {
- return client.getData(path);
- }
-
- @Override
public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
return client.getData(path, watcher);
}
@Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return client.setData(path, data);
- }
-
- @Override
public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
return client.setData(dataPath, data, version);
}
@Override
- public OperationFuture<String> delete(String path) {
- return client.delete(path);
- }
-
- @Override
public OperationFuture<String> delete(String deletePath, int version) {
return client.delete(deletePath, version);
}
+
+ public OperationFuture<ACLData> getACL(String path) {
+ return client.getACL(path);
+ }
+
+ public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+ return client.setACL(path, acl, version);
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/9393df80/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 2228d46..40b0a39 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -20,19 +20,25 @@ package org.apache.twill.zookeeper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.io.Files;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.internal.zookeeper.KillZKSession;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Assert;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -48,6 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class ZKClientTest {
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
@Test
public void testChroot() throws Exception {
InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
@@ -201,8 +210,8 @@ public class ZKClientTest {
}
@Test
- public void testRetry() throws ExecutionException, InterruptedException, TimeoutException {
- File dataDir = Files.createTempDir();
+ public void testRetry() throws ExecutionException, InterruptedException, TimeoutException, IOException {
+ File dataDir = tmpFolder.newFolder();
InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(dataDir).setTickTime(1000).build();
zkServer.startAndWait();
int port = zkServer.getLocalAddress().getPort();
@@ -251,4 +260,64 @@ public class ZKClientTest {
zkServer.stopAndWait();
}
}
+
+ @Test
+ public void testACL() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).setTickTime(1000).build();
+ zkServer.startAndWait();
+
+ try {
+ String userPass = "user:pass";
+ String digest = DigestAuthenticationProvider.generateDigest(userPass);
+
+ // Creates two zkclients
+ ZKClientService zkClient = ZKClientService.Builder
+ .of(zkServer.getConnectionStr())
+ .addAuthInfo("digest", userPass.getBytes())
+ .build();
+ zkClient.startAndWait();
+
+ ZKClientService noAuthClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ noAuthClient.startAndWait();
+
+
+ // Create a node that is readable by all client, but admin for the creator
+ String path = "/testacl";
+ zkClient.create(path, "test".getBytes(), CreateMode.PERSISTENT,
+ ImmutableList.of(
+ new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE),
+ new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS)
+ )).get();
+
+ // Verify the ACL
+ ACLData aclData = zkClient.getACL(path).get();
+ Assert.assertEquals(2, aclData.getACL().size());
+ ACL acl = aclData.getACL().get(1);
+ Assert.assertEquals(ZooDefs.Perms.ALL, acl.getPerms());
+ Assert.assertEquals("digest", acl.getId().getScheme());
+ Assert.assertEquals(digest, acl.getId().getId());
+
+ Assert.assertEquals("test", new String(noAuthClient.getData(path).get().getData()));
+
+ // When tries to write using the no-auth zk client, it should fail.
+ try {
+ noAuthClient.setData(path, "test2".getBytes()).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof KeeperException.NoAuthException);
+ }
+
+ // Change ACL to make it open for all
+ zkClient.setACL(path, ImmutableList.of(new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.ANYONE_ID_UNSAFE))).get();
+
+ // Write again with the non-auth client, now should succeed.
+ noAuthClient.setData(path, "test2".getBytes()).get();
+
+ noAuthClient.stopAndWait();
+ zkClient.stopAndWait();
+
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
}