You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/05/27 22:13:59 UTC

[3/5] hbase git commit: HBASE-13768 ZooKeeper znodes are bootstrapped with insecure ACLs in a secure configuration (Enis Soztutar)

HBASE-13768 ZooKeeper znodes are bootstrapped with insecure ACLs in a secure configuration (Enis Soztutar)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/97e1510f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/97e1510f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/97e1510f

Branch: refs/heads/branch-1.1
Commit: 97e1510fc3fb8940fb902d787ee06d5ca806158e
Parents: 8a80287
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed May 27 12:21:47 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed May 27 12:36:36 2015 -0700

----------------------------------------------------------------------
 .../hbase/zookeeper/RecoverableZooKeeper.java   |  64 +++++
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   |  22 +-
 .../hbase/zookeeper/ZooKeeperWatcher.java       | 112 ++++++++-
 .../hbase/zookeeper/TestZooKeeperWatcher.java   |  59 +++++
 .../test/IntegrationTestZKAndFSPermissions.java | 232 +++++++++++++++++++
 .../org/apache/hadoop/hbase/master/HMaster.java |   5 +
 6 files changed, 479 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/97e1510f/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
index 4272671..f23ea4a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
@@ -464,6 +464,70 @@ public class RecoverableZooKeeper {
   }
 
   /**
+   * getAcl is an idempotent operation. Retry before throwing exception
+   * @return list of ACLs
+   */
+  public List<ACL> getAcl(String path, Stat stat)
+  throws KeeperException, InterruptedException {
+    TraceScope traceScope = null;
+    try {
+      traceScope = Trace.startSpan("RecoverableZookeeper.getAcl");
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          return checkZk().getACL(path, stat);
+        } catch (KeeperException e) {
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+            case SESSIONEXPIRED:
+            case OPERATIONTIMEOUT:
+              retryOrThrow(retryCounter, e, "getAcl");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
+  }
+
+  /**
+   * setAcl is an idempotent operation. Retry before throwing exception
+   * @return list of ACLs
+   */
+  public Stat setAcl(String path, List<ACL> acls, int version)
+  throws KeeperException, InterruptedException {
+    TraceScope traceScope = null;
+    try {
+      traceScope = Trace.startSpan("RecoverableZookeeper.setAcl");
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          return checkZk().setACL(path, acls, version);
+        } catch (KeeperException e) {
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+            case SESSIONEXPIRED:
+            case OPERATIONTIMEOUT:
+              retryOrThrow(retryCounter, e, "setAcl");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    } finally {
+      if (traceScope != null) traceScope.close();
+    }
+  }
+
+  /**
    * <p>
    * NONSEQUENTIAL create is idempotent operation.
    * Retry before throwing exceptions.

http://git-wip-us.apache.org/repos/asf/hbase/blob/97e1510f/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 55492e4..5519dc5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1001,7 +1001,11 @@ public class ZKUtil {
     try {
       javax.security.auth.login.Configuration testConfig =
           javax.security.auth.login.Configuration.getConfiguration();
-      if(testConfig.getAppConfigurationEntry("Client") == null) {
+      if (testConfig.getAppConfigurationEntry("Client") == null
+          && testConfig.getAppConfigurationEntry(
+            JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME) == null
+          && testConfig.getAppConfigurationEntry(
+              JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME) == null) {
         return false;
       }
     } catch(Exception e) {
@@ -1010,16 +1014,15 @@ public class ZKUtil {
     }
 
     // Master & RSs uses hbase.zookeeper.client.*
-    return("kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication")) &&
-         conf.get("hbase.zookeeper.client.keytab.file") != null);
+    return "kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication"));
   }
 
   private static ArrayList<ACL> createACL(ZooKeeperWatcher zkw, String node) {
     return createACL(zkw, node, isSecureZooKeeper(zkw.getConfiguration()));
   }
 
-  protected static ArrayList<ACL> createACL(ZooKeeperWatcher zkw, String node,
-      boolean isSecureZooKeeper) {
+  public static ArrayList<ACL> createACL(ZooKeeperWatcher zkw, String node,
+    boolean isSecureZooKeeper) {
     if (!node.startsWith(zkw.baseZNode)) {
       return Ids.OPEN_ACL_UNSAFE;
     }
@@ -1032,14 +1035,7 @@ public class ZKUtil {
       }
       // Certain znodes are accessed directly by the client,
       // so they must be readable by non-authenticated clients
-      if ((node.equals(zkw.baseZNode) == true) ||
-          (zkw.isAnyMetaReplicaZnode(node)) ||
-          (node.equals(zkw.getMasterAddressZNode()) == true) ||
-          (node.equals(zkw.clusterIdZNode) == true) ||
-          (node.equals(zkw.rsZNode) == true) ||
-          (node.equals(zkw.backupMasterAddressesZNode) == true) ||
-          (node.startsWith(zkw.assignmentZNode) == true) ||
-          (node.startsWith(zkw.tableZNode) == true)) {
+      if (zkw.isClientReadable(node)) {
         acls.addAll(Ids.CREATOR_ALL_ACL);
         acls.addAll(Ids.READ_ACL_UNSAFE);
       } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97e1510f/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 64e6e24..4dffcd3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.zookeeper;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,11 +35,16 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
 import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
@@ -194,6 +198,109 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
     }
   }
 
+  /** Returns whether the znode is supposed to be readable by the client
+   * and DOES NOT contain sensitive information (world readable).*/
+  public boolean isClientReadable(String node) {
+    // Developer notice: These znodes are world readable. DO NOT add more znodes here UNLESS
+    // all clients need to access this data to work. Using zk for sharing data to clients (other
+    // than service lookup case is not a recommended design pattern.
+    return
+        node.equals(baseZNode) ||
+        isAnyMetaReplicaZnode(node) ||
+        node.equals(getMasterAddressZNode()) ||
+        node.equals(clusterIdZNode)||
+        node.equals(rsZNode) ||
+        // /hbase/table and /hbase/table/foo is allowed, /hbase/table-lock is not
+        node.equals(tableZNode) ||
+        node.startsWith(tableZNode + "/");
+  }
+
+  /**
+   * On master start, we check the znode ACLs under the root directory and set the ACLs properly
+   * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
+   * so that the existing znodes created with open permissions are now changed with restrictive
+   * perms.
+   */
+  public void checkAndSetZNodeAcls() {
+    if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
+      return;
+    }
+
+    // Check the base znodes permission first. Only do the recursion if base znode's perms are not
+    // correct.
+    try {
+      List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat());
+
+      if (!isBaseZnodeAclSetup(actualAcls)) {
+        LOG.info("setting znode ACLs");
+        setZnodeAclsRecursive(baseZNode);
+      }
+    } catch(KeeperException.NoNodeException nne) {
+      return;
+    } catch(InterruptedException ie) {
+      interruptedException(ie);
+    } catch (IOException|KeeperException e) {
+      LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
+    }
+  }
+
+  /**
+   * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
+   * will be set last in case the master fails in between.
+   * @param znode
+   */
+  private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
+    List<String> children = recoverableZooKeeper.getChildren(znode, false);
+
+    for (String child : children) {
+      setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child));
+    }
+    List<ACL> acls = ZKUtil.createACL(this, znode, true);
+    LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
+    recoverableZooKeeper.setAcl(znode, acls, -1);
+  }
+
+  /**
+   * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
+   * @param acls acls from zookeeper
+   * @return whether ACLs are set for the base znode
+   * @throws IOException
+   */
+  private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
+    String superUser = conf.get("hbase.superuser");
+
+    // this assumes that current authenticated user is the same as zookeeper client user
+    // configured via JAAS
+    String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    if (acls.isEmpty()) {
+      return false;
+    }
+
+    for (ACL acl : acls) {
+      int perms = acl.getPerms();
+      Id id = acl.getId();
+      // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
+      // and one for the hbase user
+      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
+        if (perms != Perms.READ) {
+          return false;
+        }
+      } else if (superUser != null && new Id("sasl", superUser).equals(id)) {
+        if (perms != Perms.ALL) {
+          return false;
+        }
+      } else if (new Id("sasl", hbaseUser).equals(id)) {
+        if (perms != Perms.ALL) {
+          return false;
+        }
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public String toString() {
     return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
@@ -317,7 +424,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
     String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
     if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID;
     // the non-default replicas are of the pattern meta-region-server-<replicaId>
-    String nonDefaultPattern = pattern + "-"; 
+    String nonDefaultPattern = pattern + "-";
     return Integer.parseInt(znode.substring(nonDefaultPattern.length()));
   }
 
@@ -565,6 +672,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
    *
    * @throws InterruptedException
    */
+  @Override
   public void close() {
     try {
       if (recoverableZooKeeper != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97e1510f/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java
new file mode 100644
index 0000000..29ea259
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class})
+public class TestZooKeeperWatcher {
+
+  @Test
+  public void testIsClientReadable() throws ZooKeeperConnectionException, IOException {
+    ZooKeeperWatcher watcher = new ZooKeeperWatcher(HBaseConfiguration.create(),
+      "testIsClientReadable", null, false);
+
+    assertTrue(watcher.isClientReadable(watcher.baseZNode));
+    assertTrue(watcher.isClientReadable(watcher.getZNodeForReplica(0)));
+    assertTrue(watcher.isClientReadable(watcher.getMasterAddressZNode()));
+    assertTrue(watcher.isClientReadable(watcher.clusterIdZNode));
+    assertTrue(watcher.isClientReadable(watcher.tableZNode));
+    assertTrue(watcher.isClientReadable(ZKUtil.joinZNode(watcher.tableZNode, "foo")));
+    assertTrue(watcher.isClientReadable(watcher.rsZNode));
+
+
+    assertFalse(watcher.isClientReadable(watcher.tableLockZNode));
+    assertFalse(watcher.isClientReadable(watcher.balancerZNode));
+    assertFalse(watcher.isClientReadable(watcher.clusterStateZNode));
+    assertFalse(watcher.isClientReadable(watcher.drainingZNode));
+    assertFalse(watcher.isClientReadable(watcher.recoveringRegionsZNode));
+    assertFalse(watcher.isClientReadable(watcher.splitLogZNode));
+    assertFalse(watcher.isClientReadable(watcher.backupMasterAddressesZNode));
+
+    watcher.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/97e1510f/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
new file mode 100644
index 0000000..c39056d
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.junit.experimental.categories.Category;
+
+/**
+ * An integration test which checks that the znodes in zookeeper and data in the FileSystem
+ * are protected for secure HBase deployments.
+ * This test is intended to be run on clusters with kerberos authorization for HBase and Zookeeper.
+ *
+ * If hbase.security.authentication is not set to kerberos, the test does not run unless -f is
+ * specified which bypasses the check. It is recommended to always run with -f on secure clusters
+ * so that the test checks the actual end result, not the configuration.
+ *
+ * The test should be run as hbase user with kinit / TGT cached since it accesses HDFS.
+ * <p>
+ * Example usage:
+ *   hbase org.apache.hadoop.hbase.test.IntegrationTestZnodeACLs -h
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestZKAndFSPermissions extends AbstractHBaseTool {
+
+  private static final Log LOG = LogFactory.getLog(IntegrationTestZKAndFSPermissions.class);
+  private String superUser;
+  private String masterPrincipal;
+  private boolean isForce;
+  private String fsPerms;
+  private boolean skipFSCheck;
+  private boolean skipZKCheck;
+
+  public static final String FORCE_CHECK_ARG = "f";
+  public static final String PRINCIPAL_ARG = "p";
+  public static final String SUPERUSER_ARG = "s";
+  public static final String FS_PERMS = "fs_perms";
+  public static final String SKIP_CHECK_FS = "skip_fs_check";
+  public static final String SKIP_CHECK_ZK = "skip_zk_check";
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+  }
+
+  @Override
+  protected void addOptions() {
+    addOptNoArg(FORCE_CHECK_ARG, "Whether to skip configuration lookup and assume a secure setup");
+    addOptWithArg(PRINCIPAL_ARG, "The principal for zk authorization");
+    addOptWithArg(SUPERUSER_ARG, "The principal for super user");
+    addOptWithArg(FS_PERMS,      "FS permissions, ex. 700, 750, etc. Defaults to 700");
+    addOptNoArg(SKIP_CHECK_FS, "Whether to skip checking FS permissions");
+    addOptNoArg(SKIP_CHECK_ZK,   "Whether to skip checking ZK permissions");
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    isForce = cmd.hasOption(FORCE_CHECK_ARG);
+    masterPrincipal = getShortUserName(conf.get("hbase.master.kerberos.principal"));
+    superUser = cmd.getOptionValue(SUPERUSER_ARG, conf.get("hbase.superuser"));
+    masterPrincipal = cmd.getOptionValue(PRINCIPAL_ARG, masterPrincipal);
+    fsPerms = cmd.getOptionValue(FS_PERMS, "700");
+    skipFSCheck = cmd.hasOption(SKIP_CHECK_FS);
+    skipZKCheck = cmd.hasOption(SKIP_CHECK_ZK);
+  }
+
+  private String getShortUserName(String principal) {
+    for (int i = 0; i < principal.length(); i++) {
+      if (principal.charAt(i) == '/' || principal.charAt(i) == '@') {
+        return principal.substring(0, i);
+      }
+    }
+    return principal;
+  }
+
+  @Override
+  protected int doWork() throws Exception {
+    if (!isForce) {
+      if (!"kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication"))) {
+        LOG.warn("hbase.security.authentication is not kerberos, and -f is not supplied. Skip "
+            + "running the test");
+        return 0;
+      }
+    }
+
+    if (!skipZKCheck) {
+      testZNodeACLs();
+    } if (!skipFSCheck) {
+      testFSPerms();
+    }
+    return 0;
+  }
+
+  private void testZNodeACLs() throws IOException, KeeperException, InterruptedException {
+
+    ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, "IntegrationTestZnodeACLs", null);
+    RecoverableZooKeeper zk = ZKUtil.connect(this.conf, watcher);
+
+    String baseZNode = watcher.baseZNode;
+
+    LOG.info("");
+    LOG.info("***********************************************************************************");
+    LOG.info("Checking ZK permissions, root znode: " + baseZNode);
+    LOG.info("***********************************************************************************");
+    LOG.info("");
+
+    checkZnodePermsRecursive(watcher, zk, baseZNode);
+
+    LOG.info("Checking ZK permissions: SUCCESS");
+  }
+
+  private void checkZnodePermsRecursive(ZooKeeperWatcher watcher,
+      RecoverableZooKeeper zk, String znode) throws KeeperException, InterruptedException {
+
+    boolean expectedWorldReadable = watcher.isClientReadable(znode);
+
+    assertZnodePerms(zk, znode, expectedWorldReadable);
+
+    try {
+      List<String> children = zk.getChildren(znode, false);
+
+      for (String child : children) {
+        checkZnodePermsRecursive(watcher, zk, ZKUtil.joinZNode(znode, child));
+      }
+    } catch (KeeperException ke) {
+      // if we are not authenticated for listChildren, it is fine.
+      if (ke.code() != Code.NOAUTH) {
+        throw ke;
+      }
+    }
+  }
+
+  private void assertZnodePerms(RecoverableZooKeeper zk, String znode,
+      boolean expectedWorldReadable) throws KeeperException, InterruptedException {
+    Stat stat = new Stat();
+    List<ACL> acls = zk.getZooKeeper().getACL(znode, stat);
+
+    LOG.info("Checking ACLs for znode znode:" + znode + " acls:" + acls);
+
+    for (ACL acl : acls) {
+      int perms = acl.getPerms();
+      Id id = acl.getId();
+      // We should only set at most 3 possible ACL for 3 Ids. One for everyone, one for superuser
+      // and one for the hbase user
+      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
+        // everyone should be set only if we are expecting this znode to be world readable
+        assertTrue(expectedWorldReadable);
+        // assert that anyone can only read
+        assertEquals(perms, Perms.READ);
+      } else if (superUser != null && new Id("sasl", superUser).equals(id)) {
+        // assert that super user has all the permissions
+        assertEquals(perms, Perms.ALL);
+      } else if (new Id("sasl", masterPrincipal).equals(id)) {
+        // hbase.master.kerberos.principal?
+        assertEquals(perms, Perms.ALL);
+      } else {
+        fail("An ACL is found which is not expected for the znode:" + znode + " , ACL:" + acl);
+      }
+    }
+  }
+
+  private void testFSPerms() throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+
+    LOG.info("");
+    LOG.info("***********************************************************************************");
+    LOG.info("Checking FS permissions for root dir:" + rootDir);
+    LOG.info("***********************************************************************************");
+    LOG.info("");
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    short expectedPerms = Short.valueOf(fsPerms, 8);
+
+    assertEquals(
+      FsPermission.createImmutable(expectedPerms),
+      fs.getFileStatus(rootDir).getPermission());
+
+    LOG.info("Checking FS permissions: SUCCESS");
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration configuration = HBaseConfiguration.create();
+    IntegrationTestingUtility.setUseDistributedCluster(configuration);
+    IntegrationTestZKAndFSPermissions tool = new IntegrationTestZKAndFSPermissions();
+    int ret = ToolRunner.run(configuration, tool, args);
+    System.exit(ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/97e1510f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 3b62336..01d247a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -808,6 +808,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     // master initialization. See HBASE-5916.
     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
 
+    // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
+    status.setStatus("Checking ZNode ACLs");
+    zooKeeper.checkAndSetZNodeAcls();
+
+    status.setStatus("Calling postStartMaster coprocessors");
     if (this.cpHost != null) {
       // don't let cp initialization errors kill the master
       try {