You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2022/01/11 00:09:00 UTC

[hbase] branch branch-2.5 updated: HBASE-26616 Refactor code related to ZooKeeper authentication (#3973)

This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 9ef7f59  HBASE-26616 Refactor code related to ZooKeeper authentication (#3973)
9ef7f59 is described below

commit 9ef7f5977ec8a0967968c84090b0d45bae54e625
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Thu Jan 6 17:04:15 2022 -0800

    HBASE-26616 Refactor code related to ZooKeeper authentication (#3973)
    
    This refactor reduces the size and scope of the `ZKUtil` class. The core of this refactor is
    moving the `login*` methods from `ZKUtil` into their own class, `ZKAuthentication`. The class
    `JaasConfiguration` is also moved along with them.
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/master/HMasterCommandLine.java    |   4 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |   3 +-
 .../hadoop/hbase/zookeeper/TestZooKeeperACL.java   |  11 +-
 .../apache/hadoop/hbase/zookeeper/HQuorumPeer.java |   2 +-
 .../apache/hadoop/hbase/zookeeper/ZKAclReset.java  |   3 +-
 .../hadoop/hbase/zookeeper/ZKAuthentication.java   | 245 +++++++++++++++++
 .../org/apache/hadoop/hbase/zookeeper/ZKUtil.java  | 295 +--------------------
 .../apache/hadoop/hbase/zookeeper/ZKWatcher.java   |  59 ++++-
 .../hadoop/hbase/zookeeper/TestZKUtilNoServer.java |   8 +-
 9 files changed, 328 insertions(+), 302 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
index 0f0a2b6..83baf15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.ServerCommandLine;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKAuthentication;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -214,7 +214,7 @@ public class HMasterCommandLine extends ServerCommandLine {
         }
 
         // login the zookeeper server principal (if using security)
-        ZKUtil.loginServer(conf, HConstants.ZK_SERVER_KEYTAB_FILE,
+        ZKAuthentication.loginServer(conf, HConstants.ZK_SERVER_KEYTAB_FILE,
           HConstants.ZK_SERVER_KERBEROS_PRINCIPAL, null);
         int localZKClusterSessionTimeout =
           conf.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", 10*1000);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3b039c1..c445ffa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -181,6 +181,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKAuthentication;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -637,7 +638,7 @@ public class HRegionServer extends Thread implements
       rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
 
       // login the zookeeper client principal (if using security)
-      ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
+      ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
           HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
       // login the server principal (if using secure Hadoop)
       login(userProvider, hostName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java
index 993cf7f..d494dd4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java
@@ -275,7 +275,7 @@ public class TestZooKeeperACL {
   @Test
   public void testIsZooKeeperSecure() throws Exception {
     boolean testJaasConfig =
-        ZKUtil.isSecureZooKeeper(new Configuration(TEST_UTIL.getConfiguration()));
+        ZKAuthentication.isSecureZooKeeper(new Configuration(TEST_UTIL.getConfiguration()));
     assertEquals(testJaasConfig, secureZKAvailable);
     // Define Jaas configuration without ZooKeeper Jaas config
     File saslConfFile = File.createTempFile("tmp", "fakeJaas.conf");
@@ -286,7 +286,8 @@ public class TestZooKeeperACL {
     System.setProperty("java.security.auth.login.config",
         saslConfFile.getAbsolutePath());
 
-    testJaasConfig = ZKUtil.isSecureZooKeeper(new Configuration(TEST_UTIL.getConfiguration()));
+    testJaasConfig = ZKAuthentication.isSecureZooKeeper(
+      new Configuration(TEST_UTIL.getConfiguration()));
     assertFalse(testJaasConfig);
     saslConfFile.delete();
   }
@@ -300,13 +301,13 @@ public class TestZooKeeperACL {
     javax.security.auth.login.Configuration.setConfiguration(new DummySecurityConfiguration());
 
     Configuration config = new Configuration(HBaseConfiguration.create());
-    boolean testJaasConfig = ZKUtil.isSecureZooKeeper(config);
+    boolean testJaasConfig = ZKAuthentication.isSecureZooKeeper(config);
     assertFalse(testJaasConfig);
 
     // Now set authentication scheme to Kerberos still it should return false
     // because no configuration set
     config.set("hbase.security.authentication", "kerberos");
-    testJaasConfig = ZKUtil.isSecureZooKeeper(config);
+    testJaasConfig = ZKAuthentication.isSecureZooKeeper(config);
     assertFalse(testJaasConfig);
 
     // Now set programmatic options related to security
@@ -314,7 +315,7 @@ public class TestZooKeeperACL {
     config.set(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, "dummy");
     config.set(HConstants.ZK_SERVER_KEYTAB_FILE, "/dummy/file");
     config.set(HConstants.ZK_SERVER_KERBEROS_PRINCIPAL, "dummy");
-    testJaasConfig = ZKUtil.isSecureZooKeeper(config);
+    testJaasConfig = ZKAuthentication.isSecureZooKeeper(config);
     assertTrue(testJaasConfig);
   }
 
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
index 86f9451..263e5d8 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
@@ -74,7 +74,7 @@ public final class HQuorumPeer {
       zkConfig.parseProperties(zkProperties);
 
       // login the zookeeper server principal (if using security)
-      ZKUtil.loginServer(conf, HConstants.ZK_SERVER_KEYTAB_FILE,
+      ZKAuthentication.loginServer(conf, HConstants.ZK_SERVER_KEYTAB_FILE,
         HConstants.ZK_SERVER_KERBEROS_PRINCIPAL,
         zkConfig.getClientPortAddress().getHostName());
 
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
index b66d5b1..81d3fc7 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.zookeeper;
 
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -61,7 +60,7 @@ public class ZKAclReset extends Configured implements Tool {
       zk.setACL(znode, ZooDefs.Ids.OPEN_ACL_UNSAFE, -1);
     } else {
       LOG.info(" - set ACLs for {}", znode);
-      zk.setACL(znode, ZKUtil.createACL(zkw, znode, true), -1);
+      zk.setACL(znode, zkw.createACL(znode, true), -1);
     }
   }
 
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAuthentication.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAuthentication.java
new file mode 100644
index 0000000..7f04490
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAuthentication.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.server.ZooKeeperSaslServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides ZooKeeper authentication services for both client and server processes.
+ */
+@InterfaceAudience.Private
+public final class ZKAuthentication {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKAuthentication.class);
+
+  private ZKAuthentication() {}
+
+  /**
+   * Log in the current zookeeper server process using the given configuration
+   * keys for the credential file and login principal.
+   *
+   * <p><strong>This is only applicable when running on secure hbase</strong>
+   * On regular HBase (without security features), this will safely be ignored.
+   * </p>
+   *
+   * @param conf The configuration data to use
+   * @param keytabFileKey Property key used to configure the path to the credential file
+   * @param userNameKey Property key used to configure the login principal
+   * @param hostname Current hostname to use in any credentials
+   * @throws IOException underlying exception from SecurityUtil.login() call
+   */
+  public static void loginServer(Configuration conf, String keytabFileKey,
+      String userNameKey, String hostname) throws IOException {
+    login(conf, keytabFileKey, userNameKey, hostname,
+          ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
+          JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME);
+  }
+
+  /**
+   * Log in the current zookeeper client using the given configuration
+   * keys for the credential file and login principal.
+   *
+   * <p><strong>This is only applicable when running on secure hbase</strong>
+   * On regular HBase (without security features), this will safely be ignored.
+   * </p>
+   *
+   * @param conf The configuration data to use
+   * @param keytabFileKey Property key used to configure the path to the credential file
+   * @param userNameKey Property key used to configure the login principal
+   * @param hostname Current hostname to use in any credentials
+   * @throws IOException underlying exception from SecurityUtil.login() call
+   */
+  public static void loginClient(Configuration conf, String keytabFileKey,
+      String userNameKey, String hostname) throws IOException {
+    login(conf, keytabFileKey, userNameKey, hostname,
+          ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
+          JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME);
+  }
+
+  /**
+   * Log in the current process using the given configuration keys for the
+   * credential file and login principal.
+   *
+   * <p><strong>This is only applicable when running on secure hbase</strong>
+   * On regular HBase (without security features), this will safely be ignored.
+   * </p>
+   *
+   * @param conf The configuration data to use
+   * @param keytabFileKey Property key used to configure the path to the credential file
+   * @param userNameKey Property key used to configure the login principal
+   * @param hostname Current hostname to use in any credentials
+   * @param loginContextProperty property name to expose the entry name
+   * @param loginContextName jaas entry name
+   * @throws IOException underlying exception from SecurityUtil.login() call
+   */
+  private static void login(Configuration conf, String keytabFileKey,
+      String userNameKey, String hostname,
+      String loginContextProperty, String loginContextName)
+      throws IOException {
+    if (!isSecureZooKeeper(conf)) {
+      return;
+    }
+
+    // User has specified a jaas.conf, keep this one as the good one.
+    // HBASE_OPTS="-Djava.security.auth.login.config=jaas.conf"
+    if (System.getProperty("java.security.auth.login.config") != null) {
+      return;
+    }
+
+    // No keytab specified, no auth
+    String keytabFilename = conf.get(keytabFileKey);
+    if (keytabFilename == null) {
+      LOG.warn("no keytab specified for: {}", keytabFileKey);
+      return;
+    }
+
+    String principalConfig = conf.get(userNameKey, System.getProperty("user.name"));
+    String principalName = SecurityUtil.getServerPrincipal(principalConfig, hostname);
+
+    // Initialize the "jaas.conf" for keyTab/principal,
+    // If keyTab is not specified use the Ticket Cache.
+    // and set the zookeeper login context name.
+    JaasConfiguration jaasConf = new JaasConfiguration(loginContextName,
+        principalName, keytabFilename);
+    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+    System.setProperty(loginContextProperty, loginContextName);
+  }
+
+  /**
+   * Returns {@code true} when secure authentication is enabled
+   * (whether {@code hbase.security.authentication} is set to
+   * "{@code kerberos}").
+   */
+  public static boolean isSecureZooKeeper(Configuration conf) {
+    // Detection for embedded HBase client with jaas configuration
+    // defined for third party programs.
+    try {
+      javax.security.auth.login.Configuration testConfig =
+          javax.security.auth.login.Configuration.getConfiguration();
+      if (testConfig.getAppConfigurationEntry("Client") == null
+          && testConfig.getAppConfigurationEntry(
+            JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME) == null
+          && testConfig.getAppConfigurationEntry(
+              JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME) == null
+          && conf.get(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL) == null
+          && conf.get(HConstants.ZK_SERVER_KERBEROS_PRINCIPAL) == null) {
+
+        return false;
+      }
+    } catch(Exception e) {
+      // No Jaas configuration defined.
+      return false;
+    }
+
+    // Master & RSs uses hbase.zookeeper.client.*
+    return "kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication"));
+  }
+
+  /**
+   * A JAAS configuration that defines the login modules that we want to use for ZooKeeper login.
+   */
+  private static class JaasConfiguration extends javax.security.auth.login.Configuration {
+    private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class);
+
+    public static final String SERVER_KEYTAB_KERBEROS_CONFIG_NAME =
+      "zookeeper-server-keytab-kerberos";
+    public static final String CLIENT_KEYTAB_KERBEROS_CONFIG_NAME =
+      "zookeeper-client-keytab-kerberos";
+
+    private static final Map<String, String> BASIC_JAAS_OPTIONS = new HashMap<>();
+
+    static {
+      String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
+      if ("true".equalsIgnoreCase(jaasEnvVar)) {
+        BASIC_JAAS_OPTIONS.put("debug", "true");
+      }
+    }
+
+    private static final Map<String, String> KEYTAB_KERBEROS_OPTIONS = new HashMap<>();
+
+    static {
+      KEYTAB_KERBEROS_OPTIONS.put("doNotPrompt", "true");
+      KEYTAB_KERBEROS_OPTIONS.put("storeKey", "true");
+      KEYTAB_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
+      KEYTAB_KERBEROS_OPTIONS.putAll(BASIC_JAAS_OPTIONS);
+    }
+
+    private static final AppConfigurationEntry KEYTAB_KERBEROS_LOGIN =
+      new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
+        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, KEYTAB_KERBEROS_OPTIONS);
+
+    private static final AppConfigurationEntry[] KEYTAB_KERBEROS_CONF =
+      new AppConfigurationEntry[] { KEYTAB_KERBEROS_LOGIN };
+
+    private javax.security.auth.login.Configuration baseConfig;
+    private final String loginContextName;
+    private final boolean useTicketCache;
+    private final String keytabFile;
+    private final String principal;
+
+    public JaasConfiguration(String loginContextName, String principal, String keytabFile) {
+      this(loginContextName, principal, keytabFile, keytabFile == null || keytabFile.length() == 0);
+    }
+
+    private JaasConfiguration(String loginContextName, String principal, String keytabFile,
+      boolean useTicketCache) {
+      try {
+        this.baseConfig = javax.security.auth.login.Configuration.getConfiguration();
+      } catch (SecurityException e) {
+        this.baseConfig = null;
+      }
+      this.loginContextName = loginContextName;
+      this.useTicketCache = useTicketCache;
+      this.keytabFile = keytabFile;
+      this.principal = principal;
+      LOG.info(
+        "JaasConfiguration loginContextName={} principal={} useTicketCache={} keytabFile={}",
+        loginContextName, principal, useTicketCache, keytabFile);
+    }
+
+    @Override public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+      if (loginContextName.equals(appName)) {
+        if (!useTicketCache) {
+          KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
+          KEYTAB_KERBEROS_OPTIONS.put("useKeyTab", "true");
+        }
+        KEYTAB_KERBEROS_OPTIONS.put("principal", principal);
+        KEYTAB_KERBEROS_OPTIONS.put("useTicketCache", useTicketCache ? "true" : "false");
+        return KEYTAB_KERBEROS_CONF;
+      }
+
+      if (baseConfig != null) {
+        return baseConfig.getAppConfigurationEntry(appName);
+      }
+
+      return (null);
+    }
+  }
+}
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index e3337f5..d0569dd 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -32,28 +32,19 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
@@ -61,22 +52,14 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooDefs.Perms;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
-import org.apache.zookeeper.server.ZooKeeperSaslServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 
@@ -143,175 +126,6 @@ public final class ZKUtil {
         retry, retryIntervalMillis, maxSleepTime, identifier, multiMaxSize);
   }
 
-  /**
-   * Log in the current zookeeper server process using the given configuration
-   * keys for the credential file and login principal.
-   *
-   * <p><strong>This is only applicable when running on secure hbase</strong>
-   * On regular HBase (without security features), this will safely be ignored.
-   * </p>
-   *
-   * @param conf The configuration data to use
-   * @param keytabFileKey Property key used to configure the path to the credential file
-   * @param userNameKey Property key used to configure the login principal
-   * @param hostname Current hostname to use in any credentials
-   * @throws IOException underlying exception from SecurityUtil.login() call
-   */
-  public static void loginServer(Configuration conf, String keytabFileKey,
-      String userNameKey, String hostname) throws IOException {
-    login(conf, keytabFileKey, userNameKey, hostname,
-          ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
-          JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME);
-  }
-
-  /**
-   * Log in the current zookeeper client using the given configuration
-   * keys for the credential file and login principal.
-   *
-   * <p><strong>This is only applicable when running on secure hbase</strong>
-   * On regular HBase (without security features), this will safely be ignored.
-   * </p>
-   *
-   * @param conf The configuration data to use
-   * @param keytabFileKey Property key used to configure the path to the credential file
-   * @param userNameKey Property key used to configure the login principal
-   * @param hostname Current hostname to use in any credentials
-   * @throws IOException underlying exception from SecurityUtil.login() call
-   */
-  public static void loginClient(Configuration conf, String keytabFileKey,
-      String userNameKey, String hostname) throws IOException {
-    login(conf, keytabFileKey, userNameKey, hostname,
-          ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
-          JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME);
-  }
-
-  /**
-   * Log in the current process using the given configuration keys for the
-   * credential file and login principal.
-   *
-   * <p><strong>This is only applicable when running on secure hbase</strong>
-   * On regular HBase (without security features), this will safely be ignored.
-   * </p>
-   *
-   * @param conf The configuration data to use
-   * @param keytabFileKey Property key used to configure the path to the credential file
-   * @param userNameKey Property key used to configure the login principal
-   * @param hostname Current hostname to use in any credentials
-   * @param loginContextProperty property name to expose the entry name
-   * @param loginContextName jaas entry name
-   * @throws IOException underlying exception from SecurityUtil.login() call
-   */
-  private static void login(Configuration conf, String keytabFileKey,
-      String userNameKey, String hostname,
-      String loginContextProperty, String loginContextName)
-      throws IOException {
-    if (!isSecureZooKeeper(conf)) {
-      return;
-    }
-
-    // User has specified a jaas.conf, keep this one as the good one.
-    // HBASE_OPTS="-Djava.security.auth.login.config=jaas.conf"
-    if (System.getProperty("java.security.auth.login.config") != null) {
-      return;
-    }
-
-    // No keytab specified, no auth
-    String keytabFilename = conf.get(keytabFileKey);
-    if (keytabFilename == null) {
-      LOG.warn("no keytab specified for: {}", keytabFileKey);
-      return;
-    }
-
-    String principalConfig = conf.get(userNameKey, System.getProperty("user.name"));
-    String principalName = SecurityUtil.getServerPrincipal(principalConfig, hostname);
-
-    // Initialize the "jaas.conf" for keyTab/principal,
-    // If keyTab is not specified use the Ticket Cache.
-    // and set the zookeeper login context name.
-    JaasConfiguration jaasConf = new JaasConfiguration(loginContextName,
-        principalName, keytabFilename);
-    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
-    System.setProperty(loginContextProperty, loginContextName);
-  }
-
-  /**
-   * A JAAS configuration that defines the login modules that we want to use for login.
-   */
-  private static class JaasConfiguration extends javax.security.auth.login.Configuration {
-    private static final String SERVER_KEYTAB_KERBEROS_CONFIG_NAME =
-      "zookeeper-server-keytab-kerberos";
-    private static final String CLIENT_KEYTAB_KERBEROS_CONFIG_NAME =
-      "zookeeper-client-keytab-kerberos";
-
-    private static final Map<String, String> BASIC_JAAS_OPTIONS = new HashMap<>();
-    static {
-      String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
-      if ("true".equalsIgnoreCase(jaasEnvVar)) {
-        BASIC_JAAS_OPTIONS.put("debug", "true");
-      }
-    }
-
-    private static final Map<String,String> KEYTAB_KERBEROS_OPTIONS = new HashMap<>();
-    static {
-      KEYTAB_KERBEROS_OPTIONS.put("doNotPrompt", "true");
-      KEYTAB_KERBEROS_OPTIONS.put("storeKey", "true");
-      KEYTAB_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
-      KEYTAB_KERBEROS_OPTIONS.putAll(BASIC_JAAS_OPTIONS);
-    }
-
-    private static final AppConfigurationEntry KEYTAB_KERBEROS_LOGIN =
-      new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
-                                LoginModuleControlFlag.REQUIRED,
-                                KEYTAB_KERBEROS_OPTIONS);
-
-    private static final AppConfigurationEntry[] KEYTAB_KERBEROS_CONF =
-      new AppConfigurationEntry[]{KEYTAB_KERBEROS_LOGIN};
-
-    private javax.security.auth.login.Configuration baseConfig;
-    private final String loginContextName;
-    private final boolean useTicketCache;
-    private final String keytabFile;
-    private final String principal;
-
-    public JaasConfiguration(String loginContextName, String principal, String keytabFile) {
-      this(loginContextName, principal, keytabFile, keytabFile == null || keytabFile.length() == 0);
-    }
-
-    private JaasConfiguration(String loginContextName, String principal,
-                             String keytabFile, boolean useTicketCache) {
-      try {
-        this.baseConfig = javax.security.auth.login.Configuration.getConfiguration();
-      } catch (SecurityException e) {
-        this.baseConfig = null;
-      }
-      this.loginContextName = loginContextName;
-      this.useTicketCache = useTicketCache;
-      this.keytabFile = keytabFile;
-      this.principal = principal;
-      LOG.info("JaasConfiguration loginContextName={} principal={} useTicketCache={} keytabFile={}",
-        loginContextName, principal, useTicketCache, keytabFile);
-    }
-
-    @Override
-    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
-      if (loginContextName.equals(appName)) {
-        if (!useTicketCache) {
-          KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
-          KEYTAB_KERBEROS_OPTIONS.put("useKeyTab", "true");
-        }
-        KEYTAB_KERBEROS_OPTIONS.put("principal", principal);
-        KEYTAB_KERBEROS_OPTIONS.put("useTicketCache", useTicketCache ? "true" : "false");
-        return KEYTAB_KERBEROS_CONF;
-      }
-
-      if (baseConfig != null) {
-        return baseConfig.getAppConfigurationEntry(appName);
-      }
-
-      return(null);
-    }
-  }
-
   //
   // Helper methods
   //
@@ -902,86 +716,6 @@ public final class ZKUtil {
     setData(zkw, sd.getPath(), sd.getData(), sd.getVersion());
   }
 
-  /**
-   * Returns whether or not secure authentication is enabled
-   * (whether <code>hbase.security.authentication</code> is set to
-   * <code>kerberos</code>.
-   */
-  public static boolean isSecureZooKeeper(Configuration conf) {
-    // Detection for embedded HBase client with jaas configuration
-    // defined for third party programs.
-    try {
-      javax.security.auth.login.Configuration testConfig =
-          javax.security.auth.login.Configuration.getConfiguration();
-      if (testConfig.getAppConfigurationEntry("Client") == null
-          && testConfig.getAppConfigurationEntry(
-            JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME) == null
-          && testConfig.getAppConfigurationEntry(
-              JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME) == null
-          && conf.get(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL) == null
-          && conf.get(HConstants.ZK_SERVER_KERBEROS_PRINCIPAL) == null) {
-
-        return false;
-      }
-    } catch(Exception e) {
-      // No Jaas configuration defined.
-      return false;
-    }
-
-    // Master & RSs uses hbase.zookeeper.client.*
-    return "kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication"));
-  }
-
-  private static ArrayList<ACL> createACL(ZKWatcher zkw, String node) {
-    return createACL(zkw, node, isSecureZooKeeper(zkw.getConfiguration()));
-  }
-
-  public static ArrayList<ACL> createACL(ZKWatcher zkw, String node,
-                                         boolean isSecureZooKeeper) {
-    if (!node.startsWith(zkw.getZNodePaths().baseZNode)) {
-      return Ids.OPEN_ACL_UNSAFE;
-    }
-    if (isSecureZooKeeper) {
-      ArrayList<ACL> acls = new ArrayList<>();
-      // add permission to hbase supper user
-      String[] superUsers = zkw.getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY);
-      String hbaseUser = null;
-      try {
-        hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
-      } catch (IOException e) {
-        LOG.debug("Could not acquire current User.", e);
-      }
-      if (superUsers != null) {
-        List<String> groups = new ArrayList<>();
-        for (String user : superUsers) {
-          if (AuthUtil.isGroupPrincipal(user)) {
-            // TODO: Set node ACL for groups when ZK supports this feature
-            groups.add(user);
-          } else {
-            if(!user.equals(hbaseUser)) {
-              acls.add(new ACL(Perms.ALL, new Id("sasl", user)));
-            }
-          }
-        }
-        if (!groups.isEmpty()) {
-          LOG.warn("Znode ACL setting for group {} is skipped, ZooKeeper doesn't support this " +
-            "feature presently.", groups);
-        }
-      }
-      // Certain znodes are accessed directly by the client,
-      // so they must be readable by non-authenticated clients
-      if (zkw.getZNodePaths().isClientReadable(node)) {
-        acls.addAll(Ids.CREATOR_ALL_ACL);
-        acls.addAll(Ids.READ_ACL_UNSAFE);
-      } else {
-        acls.addAll(Ids.CREATOR_ALL_ACL);
-      }
-      return acls;
-    } else {
-      return Ids.OPEN_ACL_UNSAFE;
-    }
-  }
-
   //
   // Node creation
   //
@@ -1004,12 +738,11 @@ public final class ZKUtil {
    * @return true if node created, false if not, watch set in both cases
    * @throws KeeperException if unexpected zookeeper exception
    */
-  public static boolean createEphemeralNodeAndWatch(ZKWatcher zkw,
-      String znode, byte [] data)
+  public static boolean createEphemeralNodeAndWatch(ZKWatcher zkw, String znode, byte [] data)
     throws KeeperException {
     boolean ret = true;
     try {
-      zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
+      zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode),
           CreateMode.EPHEMERAL);
     } catch (KeeperException.NodeExistsException nee) {
       ret = false;
@@ -1049,7 +782,7 @@ public final class ZKUtil {
     throws KeeperException {
     boolean ret = true;
     try {
-      zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
+      zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode),
           CreateMode.PERSISTENT);
     } catch (KeeperException.NodeExistsException nee) {
       ret = false;
@@ -1082,17 +815,14 @@ public final class ZKUtil {
    */
   public static String createNodeIfNotExistsNoWatch(ZKWatcher zkw, String znode, byte[] data,
       CreateMode createMode) throws KeeperException {
-    String createdZNode = null;
     try {
-      createdZNode = zkw.getRecoverableZooKeeper().create(znode, data,
-          createACL(zkw, znode), createMode);
+      return zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode), createMode);
     } catch (KeeperException.NodeExistsException nee) {
       return znode;
     } catch (InterruptedException e) {
       zkw.interruptedException(e);
       return null;
     }
-    return createdZNode;
   }
 
   /**
@@ -1115,8 +845,8 @@ public final class ZKUtil {
       String znode, byte [] data)
     throws KeeperException, KeeperException.NodeExistsException {
     try {
-      zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
-          CreateMode.PERSISTENT);
+      zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode),
+        CreateMode.PERSISTENT);
       Stat stat = zkw.getRecoverableZooKeeper().exists(znode, zkw);
       if (stat == null){
         // Likely a race condition. Someone deleted the znode.
@@ -1148,7 +878,7 @@ public final class ZKUtil {
       String znode, byte [] data, final AsyncCallback.StringCallback cb,
       final Object ctx) {
     zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data,
-        createACL(zkw, znode), CreateMode.PERSISTENT, cb, ctx);
+        zkw.createACL(znode), CreateMode.PERSISTENT, cb, ctx);
   }
 
   /**
@@ -1193,8 +923,9 @@ public final class ZKUtil {
       if (zk.exists(znode, false) == null) {
         zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags()));
       }
-    } catch(KeeperException.NodeExistsException nee) {
-    } catch(KeeperException.NoAuthException nee){
+    } catch (KeeperException.NodeExistsException nee) {
+      // pass
+    } catch (KeeperException.NoAuthException nee) {
       try {
         if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) {
           // If we failed to create the file and it does not already exist.
@@ -1203,7 +934,7 @@ public final class ZKUtil {
       } catch (InterruptedException ie) {
         zkw.interruptedException(ie);
       }
-    } catch(InterruptedException ie) {
+    } catch (InterruptedException ie) {
       zkw.interruptedException(ie);
     }
   }
@@ -1243,7 +974,7 @@ public final class ZKUtil {
       if(znode == null) {
         return;
       }
-      zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
+      zkw.getRecoverableZooKeeper().create(znode, data, zkw.createACL(znode),
           CreateMode.PERSISTENT);
     } catch(KeeperException.NodeExistsException nee) {
       return;
@@ -1746,7 +1477,7 @@ public final class ZKUtil {
 
     if (op instanceof CreateAndFailSilent) {
       CreateAndFailSilent cafs = (CreateAndFailSilent)op;
-      return Op.create(cafs.getPath(), cafs.getData(), createACL(zkw, cafs.getPath()),
+      return Op.create(cafs.getPath(), cafs.getData(), zkw.createACL(cafs.getPath()),
         CreateMode.PERSISTENT);
     } else if (op instanceof DeleteNodeFailSilent) {
       DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
index 7a9fdd6..ed43fa5 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -50,6 +49,7 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
@@ -68,11 +68,11 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
 
   // Identifier for this watcher (for logging only).  It is made of the prefix
   // passed on construction and the zookeeper sessionid.
-  private String prefix;
+  private final String prefix;
   private String identifier;
 
   // zookeeper quorum
-  private String quorum;
+  private final String quorum;
 
   // zookeeper connection
   private final RecoverableZooKeeper recoverableZooKeeper;
@@ -196,6 +196,55 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
         HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS);
   }
 
+  public List<ACL> createACL(String node) {
+    return createACL(node, ZKAuthentication.isSecureZooKeeper(getConfiguration()));
+  }
+
+  public List<ACL> createACL(String node, boolean isSecureZooKeeper) {
+    if (!node.startsWith(getZNodePaths().baseZNode)) {
+      return Ids.OPEN_ACL_UNSAFE;
+    }
+    if (isSecureZooKeeper) {
+      ArrayList<ACL> acls = new ArrayList<>();
+      // add permission to hbase supper user
+      String[] superUsers = getConfiguration().getStrings(Superusers.SUPERUSER_CONF_KEY);
+      String hbaseUser = null;
+      try {
+        hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
+      } catch (IOException e) {
+        LOG.debug("Could not acquire current User.", e);
+      }
+      if (superUsers != null) {
+        List<String> groups = new ArrayList<>();
+        for (String user : superUsers) {
+          if (AuthUtil.isGroupPrincipal(user)) {
+            // TODO: Set node ACL for groups when ZK supports this feature
+            groups.add(user);
+          } else {
+            if(!user.equals(hbaseUser)) {
+              acls.add(new ACL(Perms.ALL, new Id("sasl", user)));
+            }
+          }
+        }
+        if (!groups.isEmpty()) {
+          LOG.warn("Znode ACL setting for group {} is skipped, ZooKeeper doesn't support this " +
+            "feature presently.", groups);
+        }
+      }
+      // Certain znodes are accessed directly by the client,
+      // so they must be readable by non-authenticated clients
+      if (getZNodePaths().isClientReadable(node)) {
+        acls.addAll(Ids.CREATOR_ALL_ACL);
+        acls.addAll(Ids.READ_ACL_UNSAFE);
+      } else {
+        acls.addAll(Ids.CREATOR_ALL_ACL);
+      }
+      return acls;
+    } else {
+      return Ids.OPEN_ACL_UNSAFE;
+    }
+  }
+
   private void createBaseZNodes() throws ZooKeeperConnectionException {
     try {
       // Create all the necessary "directories" of znodes
@@ -219,7 +268,7 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
    * perms.
    */
   public void checkAndSetZNodeAcls() {
-    if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
+    if (!ZKAuthentication.isSecureZooKeeper(getConfiguration())) {
       LOG.info("not a secure deployment, proceeding");
       return;
     }
@@ -253,7 +302,7 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
     for (String child : children) {
       setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child));
     }
-    List<ACL> acls = ZKUtil.createACL(this, znode, true);
+    List<ACL> acls = createACL(znode, true);
     LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls);
     recoverableZooKeeper.setAcl(znode, acls, -1);
   }
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtilNoServer.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtilNoServer.java
index b2f8205..d429a70 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtilNoServer.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtilNoServer.java
@@ -52,7 +52,7 @@ public class TestZKUtilNoServer {
     conf.set(Superusers.SUPERUSER_CONF_KEY, "user1");
     String node = "/hbase/testUnsecure";
     ZKWatcher watcher = new ZKWatcher(conf, node, null, false);
-    List<ACL> aclList = ZKUtil.createACL(watcher, node, false);
+    List<ACL> aclList = watcher.createACL(node, false);
     assertEquals(1, aclList.size());
     assertTrue(aclList.contains(Ids.OPEN_ACL_UNSAFE.iterator().next()));
   }
@@ -63,7 +63,7 @@ public class TestZKUtilNoServer {
     conf.set(Superusers.SUPERUSER_CONF_KEY, "user1");
     String node = "/hbase/testSecuritySingleSuperuser";
     ZKWatcher watcher = new ZKWatcher(conf, node, null, false);
-    List<ACL> aclList = ZKUtil.createACL(watcher, node, true);
+    List<ACL> aclList = watcher.createACL(node, true);
     assertEquals(2, aclList.size()); // 1+1, since ACL will be set for the creator by default
     assertTrue(aclList.contains(new ACL(Perms.ALL, new Id("sasl", "user1"))));
     assertTrue(aclList.contains(Ids.CREATOR_ALL_ACL.iterator().next()));
@@ -75,7 +75,7 @@ public class TestZKUtilNoServer {
     conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3");
     String node = "/hbase/testCreateACL";
     ZKWatcher watcher = new ZKWatcher(conf, node, null, false);
-    List<ACL> aclList = ZKUtil.createACL(watcher, node, true);
+    List<ACL> aclList = watcher.createACL(node, true);
     assertEquals(4, aclList.size()); // 3+1, since ACL will be set for the creator by default
     assertFalse(aclList.contains(new ACL(Perms.ALL, new Id("sasl", "@group1"))));
     assertFalse(aclList.contains(new ACL(Perms.ALL, new Id("sasl", "@group2"))));
@@ -91,7 +91,7 @@ public class TestZKUtilNoServer {
     UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("user4"));
     String node = "/hbase/testCreateACL";
     ZKWatcher watcher = new ZKWatcher(conf, node, null, false);
-    List<ACL> aclList = ZKUtil.createACL(watcher, node, true);
+    List<ACL> aclList = watcher.createACL(node, true);
     assertEquals(3, aclList.size()); // 3, since service user the same as one of superuser
     assertFalse(aclList.contains(new ACL(Perms.ALL, new Id("sasl", "@group1"))));
     assertTrue(aclList.contains(new ACL(Perms.ALL, new Id("auth", ""))));