You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/11/15 23:51:32 UTC

[drill] 03/07: DRILL-5671: Set secure ACLs (Access Control List) for Drill ZK nodes in a secure cluster

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ded9ba98d95648a5818e1030eeab89817bb8c2a5
Author: karthik <km...@maprtech.com>
AuthorDate: Wed Jul 11 14:01:04 2018 -0700

    DRILL-5671: Set secure ACLs (Access Control List) for Drill ZK nodes in a secure cluster
    
    closes #1467
---
 .../src/resources/drill-override-example.conf      |  16 ++
 .../java/org/apache/drill/exec/ExecConstants.java  |   2 +
 .../drill/exec/coord/zk/ZKACLContextProvider.java  |  28 ++++
 .../exec/coord/zk/ZKACLContextProviderImpl.java    |  31 ++++
 .../apache/drill/exec/coord/zk/ZKACLProvider.java  |  46 ++++++
 .../drill/exec/coord/zk/ZKACLProviderDelegate.java |  52 +++++++
 .../drill/exec/coord/zk/ZKACLProviderFactory.java  | 112 ++++++++++++++
 .../drill/exec/coord/zk/ZKACLProviderTemplate.java |  38 +++++
 .../drill/exec/coord/zk/ZKClusterCoordinator.java  |  21 ++-
 .../drill/exec/coord/zk/ZKDefaultACLProvider.java  |  44 ++++++
 .../drill/exec/coord/zk/ZKSecureACLProvider.java   |  74 +++++++++
 .../org/apache/drill/exec/server/Drillbit.java     |   2 +-
 .../java-exec/src/main/resources/drill-module.conf |  10 +-
 .../org/apache/drill/exec/coord/zk/TestZKACL.java  | 165 +++++++++++++++++++++
 exec/java-exec/src/test/resources/zkacltest.conf   |  28 ++++
 15 files changed, 662 insertions(+), 7 deletions(-)

diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index 296cd8b..5aa45a9 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -72,6 +72,22 @@ drill.exec: {
   	  count: 7200,
   	  delay: 500
   	}
+  	# This option controls whether Drill specifies ACLs when it creates znodes.
+  	# If this is 'false', then anyone has all privileges for all Drill znodes.
+  	# This corresponds to ZOO_OPEN_ACL_UNSAFE.
+  	# Setting this flag to 'true' enables the provider specified in "acl_provider"
+  	apply_secure_acl: false,
+
+  	# This option specified the ACL provider to be used by Drill.
+  	# Custom ACL providers can be provided in the Drillbit classpath and Drill can be made to pick them
+  	# by changing this option.
+  	# Note: This option has no effect if "apply_secure_acl" is 'false'
+  	#
+  	# The default "creator-all" will setup ACLs such that
+  	#    - Only the Drillbit user will have all privileges(create, delete, read, write, admin). Same as ZOO_CREATOR_ALL_ACL
+  	#    - Other users will only be able to read the cluster-discovery(list of Drillbits in the cluster) znodes.
+    #
+    acl_provider: "creator-all"
   },
   http: {
     enabled: true,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index eed4ff8..e7f0cd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -50,6 +50,8 @@ public final class ExecConstants {
   public static final String ZK_TIMEOUT = "drill.exec.zk.timeout";
   public static final String ZK_ROOT = "drill.exec.zk.root";
   public static final String ZK_REFRESH = "drill.exec.zk.refresh";
+  public static final String ZK_ACL_PROVIDER = "drill.exec.zk.acl_provider";
+  public static final String ZK_APPLY_SECURE_ACL = "drill.exec.zk.apply_secure_acl";
   public static final String BIT_RETRY_TIMES = "drill.exec.rpc.bit.server.retry.count";
   public static final String BIT_RETRY_DELAY = "drill.exec.rpc.bit.server.retry.delay";
   public static final String BIT_TIMEOUT = "drill.exec.bit.timeout";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLContextProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLContextProvider.java
new file mode 100644
index 0000000..dafd5f8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLContextProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+/**
+ * Defines the functions required by {@link ZKACLProviderDelegate} to access ZK-ACL related information
+ */
+
+public interface ZKACLContextProvider {
+
+  String getClusterPath();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLContextProviderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLContextProviderImpl.java
new file mode 100644
index 0000000..fc29208
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLContextProviderImpl.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+public class ZKACLContextProviderImpl implements ZKACLContextProvider {
+
+  private final String clusterPath;
+
+  public ZKACLContextProviderImpl(String clusterPath) {
+    this.clusterPath = clusterPath;
+  }
+
+  @Override
+  public String getClusterPath() { return  clusterPath; }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProvider.java
new file mode 100644
index 0000000..54bee58
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+/**
+ * This class defines the methods that are required to specify
+ * ACLs on Drill ZK nodes
+ */
+
+public interface ZKACLProvider {
+
+  /**
+   * Returns the list of ZK ACLs {@link ACL} to apply by default
+   * on Drill znodes
+   * @return  List of ZK ACLs {@link ACL}
+   */
+  List<ACL> getDrillDefaultAcl();
+
+  /**
+   * Returns the list of ZK ACLs {@link ACL} to apply for a specific
+   * Drill znode
+   * @param path The path for which the ACL is being looked up
+   * @return List of ZK ACLs {@link ACL} for the provided path
+   */
+  List<ACL> getDrillAclForPath(String path);
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderDelegate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderDelegate.java
new file mode 100644
index 0000000..a3f7ee3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderDelegate.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+/**
+ * This class hides the {@link ZKACLProvider} from Curator-specific functions
+ *
+ * This is done so that ACL Providers have to be aware only about ZK ACLs and the Drill {@link ZKACLProvider} interface
+ * ACL Providers should not be concerned with the framework (Curator) used by Drill to access ZK.
+ * If Drill stops using Curator, then existing {@link ZKACLProvider} implementations will still work.
+ */
+
+public class ZKACLProviderDelegate implements ACLProvider {
+  @VisibleForTesting
+  ZKACLProvider aclProvider;
+
+  public ZKACLProviderDelegate(ZKACLProvider aclProvider) {
+    this.aclProvider = aclProvider;
+  }
+
+  @Override
+  final public List<ACL> getDefaultAcl() {
+    return aclProvider.getDrillDefaultAcl();
+  }
+
+  @Override
+  final public List<ACL> getAclForPath(String path) {
+    return aclProvider.getDrillAclForPath(path);
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderFactory.java
new file mode 100644
index 0000000..f43aec0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import org.apache.drill.common.config.DrillConfig;
+import static org.apache.drill.exec.ExecConstants.ZK_ACL_PROVIDER;
+import static org.apache.drill.exec.ExecConstants.ZK_APPLY_SECURE_ACL;
+
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+
+/**
+ * This factory returns a {@link ZKACLProviderDelegate} which will be used to set ACLs on Drill ZK nodes
+ * If secure ACLs are required, the {@link ZKACLProviderFactory} looks up and instantiates a {@link ZKACLProviderDelegate}
+ * specified in the config file. Else it returns the {@link ZKDefaultACLProvider}
+ */
+public class ZKACLProviderFactory {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKACLProviderFactory.class);
+
+    public static ZKACLProviderDelegate getACLProvider(DrillConfig config, String drillClusterPath, BootStrapContext context)
+            throws DrillbitStartupException {
+        ZKACLContextProvider stateProvider = new ZKACLContextProviderImpl(drillClusterPath);
+
+        if (config.getBoolean(ZK_APPLY_SECURE_ACL)) {
+            logger.trace("Using secure ZK ACL. Drill cluster path " + drillClusterPath);
+            ZKACLProviderDelegate aclProvider = findACLProvider(config, stateProvider, context);
+            return aclProvider;
+        }
+        logger.trace("Using un-secure default ZK ACL");
+        final ZKDefaultACLProvider defaultAclProvider = new ZKDefaultACLProvider(stateProvider);
+        return new ZKACLProviderDelegate(defaultAclProvider);
+    }
+
+    public static ZKACLProviderDelegate findACLProvider(DrillConfig config, ZKACLContextProvider contextProvider,
+                                                        BootStrapContext context) throws DrillbitStartupException {
+        if (!config.hasPath(ZK_ACL_PROVIDER)) {
+            throw new DrillbitStartupException(String.format("BOOT option '%s' is missing in config.", ZK_ACL_PROVIDER));
+        }
+
+        final String aclProviderName = config.getString(ZK_ACL_PROVIDER);
+
+        if (Strings.isNullOrEmpty(aclProviderName)) {
+            throw new DrillbitStartupException(String.format("Invalid value '%s' for BOOT option '%s'", aclProviderName,
+                    ZK_ACL_PROVIDER));
+        }
+
+        ScanResult scan = context.getClasspathScan();
+        final Collection<Class<? extends ZKACLProvider>> aclProviderImpls =
+                scan.getImplementations(ZKACLProvider.class);
+        logger.debug("Found ZkACLProvider implementations: {}", aclProviderImpls);
+
+        for (Class<? extends ZKACLProvider> clazz : aclProviderImpls) {
+          final ZKACLProviderTemplate template = clazz.getAnnotation(ZKACLProviderTemplate.class);
+          if (template == null) {
+            logger.warn("{} doesn't have {} annotation. Skipping.", clazz.getCanonicalName(),
+                    ZKACLProviderTemplate.class);
+            continue;
+          }
+
+          if (template.type().equalsIgnoreCase(aclProviderName)) {
+            Constructor<?> validConstructor = null;
+            Class constructorArgumentClass = ZKACLContextProvider.class;
+            for (Constructor<?> c : clazz.getConstructors()) {
+              Class<?>[] params = c.getParameterTypes();
+              if (params.length == 1 && params[0] == constructorArgumentClass) {
+                validConstructor = c;
+                break;
+              }
+            }
+            if (validConstructor == null) {
+              logger.warn("Skipping ZKACLProvider implementation class '{}' since it doesn't " +
+                       "implement a constructor [{}({})]", clazz.getCanonicalName(), clazz.getName(),
+                      constructorArgumentClass.getName());
+              continue;
+            }
+            try {
+              final ZKACLProvider aclProvider = (ZKACLProvider) validConstructor.newInstance(contextProvider);
+              return new ZKACLProviderDelegate(aclProvider);
+            } catch (ReflectiveOperationException e ) {
+               throw new DrillbitStartupException(
+                  String.format("Failed to create and initialize the ZKACLProvider class '%s'",
+                                    clazz.getCanonicalName()), e);
+            }
+          }
+        }
+        String errMsg = String.format("Failed to find the implementation of '%s' for type '%s'",
+                ZKACLProvider.class.getCanonicalName(), aclProviderName);
+        logger.error(errMsg);
+        throw new DrillbitStartupException(errMsg);
+    }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderTemplate.java
new file mode 100644
index 0000000..2fed3c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKACLProviderTemplate.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for {@link ZKACLProviderDelegate} implementation to identify the
+ * implementation type. Implementation type is set in BOOT option <i>drill.exec.zk.acl_provider</i>.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface ZKACLProviderTemplate {
+  /**
+   * {@link ZKACLProviderDelegate} implementation type.
+   * @return Returns a name describing the type of the ACL Provider
+   */
+  String type();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index 82e45b2..853c410 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -31,11 +31,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.shaded.guava.com.google.common.base.Throwables;
 import org.apache.commons.collections.keyvalue.MultiKey;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
@@ -78,11 +81,18 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
   private ConcurrentHashMap<MultiKey, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<MultiKey,DrillbitEndpoint>();
   private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");
 
-  public ZKClusterCoordinator(DrillConfig config) throws IOException{
-    this(config, null);
+  public ZKClusterCoordinator(DrillConfig config, String connect)
+          throws IOException, DrillbitStartupException {
+    this(config, connect, null);
   }
 
-  public ZKClusterCoordinator(DrillConfig config, String connect) throws IOException {
+  public ZKClusterCoordinator(DrillConfig config, BootStrapContext context)
+          throws IOException, DrillbitStartupException {
+    this(config, null, context);
+  }
+
+  public ZKClusterCoordinator(DrillConfig config, String connect, BootStrapContext context)
+          throws IOException, DrillbitStartupException {
     connect = connect == null || connect.isEmpty() ? config.getString(ExecConstants.ZK_CONNECTION) : connect;
     String clusterId = config.getString(ExecConstants.SERVICE_NAME);
     String zkRoot = config.getString(ExecConstants.ZK_ROOT);
@@ -99,6 +109,10 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
     logger.debug("Connect {}, zkRoot {}, clusterId: " + clusterId, connect, zkRoot);
 
     this.serviceName = clusterId;
+    String drillClusterPath = "/" + zkRoot + "/" +  clusterId;
+
+    ACLProvider aclProvider = ZKACLProviderFactory.getACLProvider(config, drillClusterPath, context);
+
     RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
       config.getInt(ExecConstants.ZK_RETRY_DELAY));
     curator = CuratorFrameworkFactory.builder()
@@ -106,6 +120,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
       .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
       .retryPolicy(rp)
       .connectString(connect)
+      .aclProvider(aclProvider)
       .build();
     curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
     curator.start();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKDefaultACLProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKDefaultACLProvider.java
new file mode 100644
index 0000000..c3151f4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKDefaultACLProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+/**
+ * ZKDefaultACLProvider provides the ACLs for znodes created in a un-secure installation.
+ *
+ * If this ACLProvider is used, everyone gets unrestricted access to the Drill znodes
+ *
+ */
+@ZKACLProviderTemplate(type = "open")
+public class ZKDefaultACLProvider implements ZKACLProvider {
+
+    public ZKDefaultACLProvider(ZKACLContextProvider context) { }
+
+    public List<ACL> getDrillDefaultAcl() {
+        return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+    }
+
+    public List<ACL> getDrillAclForPath(String path) {
+        return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+    }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKSecureACLProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKSecureACLProvider.java
new file mode 100644
index 0000000..c36e65c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKSecureACLProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+/**
+ * ZKSecureACLProvider restricts access to znodes created by Drill in a secure installation.
+ *
+ * For all other znodes, only the creator of the znode, i.e the Drillbit user, has full access.
+ * In addition to the above, all znodes under the cluster path are readable by anyone.
+ *
+ */
+@ZKACLProviderTemplate(type = "creator-all")
+public class ZKSecureACLProvider implements ZKACLProvider {
+
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKSecureACLProvider.class);
+
+    /**
+     * DEFAULT_ACL gives the creator of a znode full-access to it
+     */
+    private static final ImmutableList<ACL> DEFAULT_ACL = new ImmutableList.Builder<ACL>()
+                                              .addAll(Ids.CREATOR_ALL_ACL.iterator())
+                                              .build();
+    /**
+     * DRILL_CLUSTER_ACL gives the creator full access and everyone else only read access.
+     * Used on the Drillbit discovery znode (znode path /<drill.exec.zk.root>/<drill.exec.cluster-id>)
+     * i.e. the node that contains the list of Drillbits in the cluster.
+     */
+     private static final ImmutableList<ACL> DRILL_CLUSTER_ACL = new ImmutableList.Builder<ACL>()
+                                                .addAll(Ids.READ_ACL_UNSAFE.iterator())
+                                                .addAll(Ids.CREATOR_ALL_ACL.iterator())
+                                                .build();
+    final String drillClusterPath;
+
+    public ZKSecureACLProvider(ZKACLContextProvider contextProvider) {
+        this.drillClusterPath = contextProvider.getClusterPath();
+    }
+
+    @Override
+    public List<ACL> getDrillDefaultAcl() {
+        return DEFAULT_ACL;
+    }
+
+    @Override
+    public List<ACL> getDrillAclForPath(String path) {
+        logger.trace("getAclForPath " + path);
+        if (path.startsWith(drillClusterPath)) {
+            logger.trace("getAclForPath drillClusterPath " + drillClusterPath);
+            return DRILL_CLUSTER_ACL;
+        }
+        return DEFAULT_ACL;
+    }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index dd1c5f1..5b26ff7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -157,7 +157,7 @@ public class Drillbit implements AutoCloseable {
       coord = serviceSet.getCoordinator();
       storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config));
     } else {
-      coord = new ZKClusterCoordinator(config);
+      coord = new ZKClusterCoordinator(config, context);
       storeProvider = new PersistentStoreRegistry<ClusterCoordinator>(this.coord, config).newPStoreProvider();
     }
 
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 5981f2d..23245be 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -29,7 +29,8 @@ drill {
       org.apache.drill.exec.rpc.security.AuthenticatorFactory,
       org.apache.drill.exec.server.rest.auth.DrillHttpConstraintSecurityHandler,
       org.apache.drill.exec.store.dfs.FormatPlugin,
-      org.apache.drill.exec.store.StoragePlugin
+      org.apache.drill.exec.store.StoragePlugin,
+      org.apache.drill.exec.coord.zk.ZKACLProvider
     ],
 
     annotations: ${?drill.classpath.scanning.annotations} [
@@ -43,7 +44,8 @@ drill {
       org.apache.drill.exec.store,
       org.apache.drill.exec.rpc.user.security,
       org.apache.drill.exec.rpc.security,
-      org.apache.drill.exec.server.rest.auth
+      org.apache.drill.exec.server.rest.auth,
+      org.apache.drill.exec.coord.zk
     ],
 
     // caches scanned result during build time
@@ -123,7 +125,9 @@ drill.exec: {
     retry: {
       count: 7200,
       delay: 500
-    }
+    },
+    apply_secure_acl: false,
+    acl_provider: "creator-all"
   },
   http: {
     enabled: true,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZKACL.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZKACL.java
new file mode 100644
index 0000000..07e0465
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZKACL.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.drill.categories.SecurityTest;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.zookeeper.data.ACL;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+
+@Ignore("See DRILL-6823")
+@Category(SecurityTest.class)
+public class TestZKACL {
+
+  private TestingServer server;
+  private final static String cluster_config_znode = "test-cluster_config_znode";
+  private final static byte[] cluster_config_data = "drill-node-1".getBytes();
+  private final static String drill_zk_root = "drill-test-drill_zk_root";
+  private final static String drill_cluster_name = "test-drillbits";
+  private static final String drillClusterPath = "/" + drill_zk_root + "/" + drill_cluster_name;
+  private static final RetryPolicy retryPolicy = new RetryNTimes(1, 1000);
+
+  private static final String drillUDFName = "test-udfs";
+  private final static byte[] udf_data = "test-udf-1".getBytes();
+  private static final String drillUDFPath = "/" + drill_zk_root + "/" + drillUDFName;
+  private static ACLProvider aclProviderDelegate;
+
+  private static CuratorFramework client;
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+    String configPath = (ClassLoader.getSystemResource("zkacltest.conf")).getPath();
+    System.setProperty("java.security.auth.login.config", configPath);
+    server = new TestingServer();
+
+
+    final DrillConfig config = new DrillConfig(DrillConfig.create().withValue(ExecConstants.ZK_ACL_PROVIDER,
+            ConfigValueFactory.fromAnyRef("creator-all")
+    ).withValue(ExecConstants.ZK_APPLY_SECURE_ACL, ConfigValueFactory.fromAnyRef(true)));
+
+    final ScanResult result = ClassPathScanner.fromPrescan(config);
+    final BootStrapContext bootStrapContext =
+            new BootStrapContext(config, SystemOptionManager.createDefaultOptionDefinitions(), result);
+    aclProviderDelegate = ZKACLProviderFactory.getACLProvider(config, drillClusterPath, bootStrapContext);
+
+    server.start();
+
+    client =  CuratorFrameworkFactory.builder().
+            retryPolicy(retryPolicy).
+            connectString(server.getConnectString()).
+            aclProvider(aclProviderDelegate).
+            build();
+    client.start();
+  }
+
+  /**
+   * Test ACLs on znodes required to discover the cluster
+   *
+   * ZK libraries only supports one client instance per-machine per-server and it is cached.
+   * This test will fail when run after other ZK tests that setup the client in a way that will cause this test to fail
+   */
+
+  @Test
+  public void testClusterDiscoveryPaths() {
+    try {
+      String path = PathUtils.join(drillClusterPath, cluster_config_znode);
+      client.create().creatingParentsIfNeeded().forPath(path, cluster_config_data);
+      List<ACL> remoteACLs = client.getACL().forPath(path);
+      List<ACL> desiredACLs = ((ZKACLProviderDelegate) aclProviderDelegate).aclProvider.getDrillAclForPath(drillClusterPath);
+
+      // Check the ACLs
+      for (ACL remote : remoteACLs) {
+        boolean found = false;
+        for (ACL desired : desiredACLs) {
+          // desiredACL list is READ_ACL_UNSAFE (READ, WORLD_ANYONE) + CREATOR_ALL_ACL(ALL, AUTH)
+          // AUTH in CREATOR_ALL would translate to SASL, username. Hence the replacement
+          // Note: The username("testuser1") should match the username in java.security.auth.login.config
+          found = desired.toString().equals(remote.toString().replace("sasl", "auth").replace("testuser1", ""));
+
+          if (found) { break; }
+        }
+        Assert.assertTrue(found);
+      }
+      // check if the data can be read
+      byte[] actual = client.getData().forPath(path);
+      Assert.assertArrayEquals("testClusterDiscoveryPaths data mismatch", cluster_config_data, actual);
+
+    } catch (Exception e) {
+      throw new IllegalStateException("testClusterDiscoveryPaths failed");
+    }
+  }
+
+  /**
+   * Test ACLs on znodes other than ones required to discover the cluster
+   *
+   * ZK libraries only supports one client instance per-machine per-server and it is cached.
+   * This test will fail when run after other ZK tests that setup the client in a way that will cause this test to fail
+   */
+  @Test
+  public void testNonClusterDiscoveryPaths() {
+    try {
+      client.create().creatingParentsIfNeeded().forPath(drillUDFPath, udf_data);
+      List<ACL> remoteACLs = client.getACL().forPath(drillUDFPath);
+      List<ACL> desiredACLs = ((ZKACLProviderDelegate) aclProviderDelegate).aclProvider.getDrillAclForPath(drillUDFPath);
+      Assert.assertEquals(remoteACLs.size(), desiredACLs.size());
+      for (ACL remote : remoteACLs) {
+        boolean found = false;
+        for (ACL desired : desiredACLs) {
+          // desiredACL list is READ_ACL_UNSAFE (READ, WORLD_ANYONE) + CREATOR_ALL_ACL(ALL, AUTH)
+          // AUTH in CREATOR_ALL would translate to SASL, username. Hence the replacement
+          // Note: The username("testuser1") should match the username in java.security.auth.login.config
+          found = desired.toString().equals(remote.toString().replace("sasl", "auth").replace("testuser1", ""));
+          if (found) { break; }
+        }
+        Assert.assertTrue(found);
+      }
+      // check if the data can be read
+      byte[] actual = client.getData().forPath(drillUDFPath);
+      Assert.assertArrayEquals("testNonClusterDiscoveryPaths data mismatch", udf_data, actual);
+
+    } catch (Exception e) {
+      throw new IllegalStateException("testNonClusterDiscoveryPaths failed");
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    client.close();
+    server.close();
+  }
+}
diff --git a/exec/java-exec/src/test/resources/zkacltest.conf b/exec/java-exec/src/test/resources/zkacltest.conf
new file mode 100644
index 0000000..e1b5455
--- /dev/null
+++ b/exec/java-exec/src/test/resources/zkacltest.conf
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This file contains ZK client and server config for testing ZK ACLs
+
+  Server {
+            org.apache.zookeeper.server.auth.DigestLoginModule required
+            user_testuser1="testpassword"
+            user_testuser2="testpassword";
+           };
+                
+  Client {
+          	org.apache.zookeeper.server.auth.DigestLoginModule required
+          	username="testuser1"
+          	password="testpassword";
+          };