You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/03/26 08:32:34 UTC

[hive] branch master updated: Revert "HIVE-23045: Zookeeper SSL/TLS support (Peter Varga via Denys Kuzmenko)"

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e85586d  Revert "HIVE-23045: Zookeeper SSL/TLS support (Peter Varga via Denys Kuzmenko)"
e85586d is described below

commit e85586d4dbaa1f8953c69fd3b7e5580568f2d3ae
Author: Denys Kuzmenko <dk...@apache.org>
AuthorDate: Thu Mar 26 09:32:14 2020 +0100

    Revert "HIVE-23045: Zookeeper SSL/TLS support (Peter Varga via Denys Kuzmenko)"
    
    This reverts commit cb4427e1eaaf27029a397acadc62b8dddcb0437e.
---
 HIVE-23045.10.patch                                | 1742 --------------------
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   69 +-
 .../hcatalog/templeton/tool/ZooKeeperStorage.java  |   40 +-
 ...eTestBase.java => TestZooKeeperTokenStore.java} |   71 +-
 .../security/TestZookeeperTokenStorePlain.java     |   35 -
 .../TestZookeeperTokenStoreSSLEnabled.java         |   35 -
 .../org/apache/hive/jdbc/TestRestrictedList.java   |    4 -
 .../org/apache/hive/jdbc/TestServiceDiscovery.java |    8 +-
 ...ava => TestInformationSchemaWithPrivilege.java} |   40 +-
 ...formationSchemaWithPrivilegeZookeeperPlain.java |   35 -
 ...InformationSchemaWithPrivilegeZookeeperSSL.java |   35 -
 jdbc/src/java/org/apache/hive/jdbc/Utils.java      |   57 +-
 .../hive/jdbc/ZooKeeperHiveClientHelper.java       |   34 +-
 .../hadoop/hive/registry/impl/ZkRegistryBase.java  |   52 +-
 .../zookeeper/CuratorFrameworkSingleton.java       |   12 +-
 .../hive/testutils/MiniZooKeeperCluster.java       |   72 +-
 .../apache/hive/service/server/HiveServer2.java    |   11 +-
 .../hadoop/hive/common/SSLZookeeperFactory.java    |   78 -
 .../hadoop/hive/common/ZooKeeperHiveHelper.java    |  227 +--
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   66 +-
 .../security/MetastoreDelegationTokenManager.java  |   10 -
 .../metastore/security/ZooKeeperTokenStore.java    |   47 +-
 22 files changed, 195 insertions(+), 2585 deletions(-)

diff --git a/HIVE-23045.10.patch b/HIVE-23045.10.patch
deleted file mode 100644
index ce2fd49..0000000
--- a/HIVE-23045.10.patch
+++ /dev/null
@@ -1,1742 +0,0 @@
-diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
-index d50912b4e2..34df01e60e 100644
---- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
-+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
-@@ -2672,6 +2672,25 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
-         new TimeValidator(TimeUnit.MILLISECONDS),
-         "Initial amount of time (in milliseconds) to wait between retries\n" +
-         "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
-+    HIVE_ZOOKEEPER_SSL_ENABLE("hive.zookeeper.ssl.client.enable", false,
-+        "Set client to use TLS when connecting to ZooKeeper.  An explicit value overrides any value set via the " +
-+            "zookeeper.client.secure system property (note the different name).  Defaults to false if neither is set."),
-+    HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION("hive.zookeeper.ssl.keystore.location", "",
-+        "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
-+            "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
-+            "system property (note the camelCase)."),
-+    HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("hive.zookeeper.ssl.keystore.password", "",
-+        "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
-+            "Overrides any explicit value set via the zookeeper.ssl.keyStore.password " +
-+             "system property (note the camelCase)."),
-+    HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("hive.zookeeper.ssl.truststore.location", "",
-+        "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
-+            "Overrides any explicit value set via the zookeeper.ssl.trustStore.location" +
-+            "system property (note the camelCase)."),
-+    HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("hive.zookeeper.ssl.truststore.password", "",
-+        "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
-+            "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
-+             "system property (note the camelCase)."),
- 
-     // Transactions
-     HIVE_TXN_MANAGER("hive.txn.manager",
-@@ -4795,14 +4814,18 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
-             "hive.spark.client.rpc.server.address," +
-             "hive.spark.client.rpc.server.port," +
-             "hive.spark.client.rpc.sasl.mechanisms," +
--            "bonecp.,"+
--            "hive.druid.broker.address.default,"+
--            "hive.druid.coordinator.address.default,"+
--            "hikaricp.,"+
--            "hadoop.bin.path,"+
--            "yarn.bin.path,"+
--            "spark.home,"+
--            "hive.driver.parallel.compilation.global.limit",
-+            "bonecp.," +
-+            "hive.druid.broker.address.default," +
-+            "hive.druid.coordinator.address.default," +
-+            "hikaricp.," +
-+            "hadoop.bin.path," +
-+            "yarn.bin.path," +
-+            "spark.home," +
-+            "hive.driver.parallel.compilation.global.limit," +
-+            "hive.zookeeper.ssl.keystore.location," +
-+            "hive.zookeeper.ssl.keystore.password," +
-+            "hive.zookeeper.ssl.truststore.location," +
-+            "hive.zookeeper.ssl.truststore.password",
-         "Comma separated list of configuration options which are immutable at runtime"),
-     HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
-         METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname
-@@ -4817,7 +4840,11 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
-         + ",fs.s3a.proxy.password"
-         + ",dfs.adls.oauth2.credential"
-         + ",fs.adl.oauth2.credential"
--        + ",fs.azure.account.oauth2.client.secret",
-+        + ",fs.azure.account.oauth2.client.secret"
-+        + ",hive.zookeeper.ssl.keystore.location"
-+        + ",hive.zookeeper.ssl.keystore.password"
-+        + ",hive.zookeeper.ssl.truststore.location"
-+        + ",hive.zookeeper.ssl.truststore.password",
-         "Comma separated list of configuration options which should not be read by normal user like passwords"),
-     HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
-         "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
-@@ -5619,14 +5646,22 @@ public void logVars(PrintStream ps) {
-    * given HiveConf.
-    */
-   public ZooKeeperHiveHelper getZKConfig() {
--    return new ZooKeeperHiveHelper(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM),
--            getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT),
--            getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE),
--            (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
--                    TimeUnit.MILLISECONDS),
--            (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
--                    TimeUnit.MILLISECONDS),
--            getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES));
-+    return ZooKeeperHiveHelper.builder()
-+      .quorum(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM))
-+      .clientPort(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT))
-+      .serverRegistryNameSpace(getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE))
-+      .connectionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
-+          TimeUnit.MILLISECONDS))
-+      .sessionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
-+          TimeUnit.MILLISECONDS))
-+      .baseSleepTime((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
-+          TimeUnit.MILLISECONDS))
-+      .maxRetries(getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
-+      .sslEnabled(getBoolVar(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
-+      .keyStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
-+      .keyStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
-+      .trustStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
-+      .trustStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
-   }
- 
-   public HiveConf() {
-diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
-index 1fc8d36394..02a8926ed5 100644
---- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
-+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
-@@ -22,11 +22,11 @@
- import java.util.ArrayList;
- import java.util.List;
- 
-+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
--import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.KeeperException;
-@@ -50,8 +50,12 @@
-   public String overhead_path = null;
- 
-   public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
--  public static final String ZK_SESSION_TIMEOUT
--    = "templeton.zookeeper.session-timeout";
-+  public static final String ZK_SESSION_TIMEOUT = "templeton.zookeeper.session-timeout";
-+  public static final String ZK_SSL_ENABLE = "templeton.zookeeper.ssl.client.enable";
-+  public static final String ZK_KEYSTORE_LOCATION = "templeton.zookeeper.keystore.location";
-+  public static final String ZK_KEYSTORE_PASSWORD = "templeton.zookeeper.keystore.password";
-+  public static final String ZK_TRUSTSTORE_LOCATION = "templeton.zookeeper.truststore.location";
-+  public static final String ZK_TRUSTSTORE_PASSWORD = "templeton.zookeeper.truststore.password";
- 
-   public static final String ENCODING = "UTF-8";
- 
-@@ -59,27 +63,25 @@
- 
-   private CuratorFramework zk;
- 
--  /**
--   * Open a ZooKeeper connection for the JobState.
--   */
--  public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
--    throws IOException {
--    //do we need to add a connection status listener?  What will that do?
--    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
--    CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
--      CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
--    zk.start();
--    return zk;
--  }
- 
-   /**
-    * Open a ZooKeeper connection for the JobState.
-    */
-   public static CuratorFramework zkOpen(Configuration conf) throws IOException {
--    /*the silly looking call to Builder below is to get the default value of session timeout
--    from Curator which itself exposes it as system property*/
--    return zkOpen(conf.get(ZK_HOSTS),
--      conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
-+    ZooKeeperHiveHelper xkHelper = ZooKeeperHiveHelper.builder()
-+        .quorum(conf.get(ZK_HOSTS))
-+        .sessionTimeout(conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()))
-+        .baseSleepTime(1000)
-+        .maxRetries(3)
-+        .sslEnabled(conf.getBoolean(ZK_SSL_ENABLE, false))
-+        .keyStoreLocation(conf.get(ZK_KEYSTORE_LOCATION, ""))
-+        .keyStorePassword(conf.get(ZK_KEYSTORE_PASSWORD, ""))
-+        .trustStoreLocation(conf.get(ZK_TRUSTSTORE_LOCATION, ""))
-+        .trustStorePassword(conf.get(ZK_TRUSTSTORE_PASSWORD, ""))
-+        .build();
-+    CuratorFramework zk = xkHelper.getNewZookeeperClient();
-+    zk.start();
-+    return zk;
-   }
- 
-   public ZooKeeperStorage() {
-diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
-new file mode 100644
-index 0000000000..084e097be9
---- /dev/null
-+++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
-@@ -0,0 +1,35 @@
-+/*
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+package org.apache.hadoop.hive.metastore.security;
-+
-+import org.junit.BeforeClass;
-+
-+/**
-+ * TestZookeeperTokenStore with zookeeper SSL communication disabled.
-+ */
-+public class TestZookeeperTokenStorePlain extends ZooKeeperTokenStoreTestBase {
-+
-+  public TestZookeeperTokenStorePlain(){
-+    super();
-+  }
-+
-+  @BeforeClass
-+  public static void setUp() throws Exception {
-+    setUpInternal(false);
-+  }
-+}
-diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
-new file mode 100644
-index 0000000000..40179901ce
---- /dev/null
-+++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
-@@ -0,0 +1,35 @@
-+/*
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+package org.apache.hadoop.hive.metastore.security;
-+
-+import org.junit.BeforeClass;
-+
-+/**
-+ * TestZookeeperTokenStore with zookeeper SSL communication enabled.
-+ */
-+public class TestZookeeperTokenStoreSSLEnabled extends ZooKeeperTokenStoreTestBase {
-+
-+  public TestZookeeperTokenStoreSSLEnabled(){
-+    super();
-+  }
-+
-+  @BeforeClass
-+  public static void setUp() throws Exception {
-+    setUpInternal(true);
-+  }
-+}
-diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
-similarity index 76%
-rename from itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
-rename to itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
-index 603155bf8f..35053e70b0 100644
---- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
-+++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
-@@ -25,67 +25,84 @@
- 
- 
- import org.apache.curator.framework.CuratorFramework;
--import org.apache.curator.framework.CuratorFrameworkFactory;
--import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.hive.conf.HiveConf;
- import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
--import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
- import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
- import org.apache.hive.testutils.MiniZooKeeperCluster;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.data.ACL;
-+import org.junit.AfterClass;
-+import org.junit.After;
- import org.junit.Assert;
-+import org.junit.Test;
-+
- import static org.junit.Assert.assertNotNull;
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertNotSame;
- import static org.junit.Assert.assertTrue;
- import static org.junit.Assert.assertNull;
- import static org.junit.Assert.fail;
--import org.junit.Before;
--import org.junit.After;
--import org.junit.Test;
- 
- /**
-  * TestZooKeeperTokenStore.
-  */
--public class TestZooKeeperTokenStore {
-+public abstract class ZooKeeperTokenStoreTestBase {
-+
-+  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
-+  private static final String TRUST_STORE_NAME = "truststore.jks";
-+  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
- 
--  private MiniZooKeeperCluster zkCluster = null;
--  private CuratorFramework zkClient = null;
--  private int zkPort = -1;
--  private ZooKeeperTokenStore ts;
-+  private static MiniZooKeeperCluster zkCluster = null;
-+  private static int zkPort = -1;
-+  private static ZooKeeperTokenStore ts;
-+  private static boolean zkSslEnabled;
- 
--  @Before
--  public void setUp() throws Exception {
-+  public static void setUpInternal(boolean sslEnabled) throws Exception{
-     File zkDataDir = new File(System.getProperty("test.tmp.dir"));
--    if (this.zkCluster != null) {
-+    if (zkCluster != null) {
-       throw new IOException("Cluster already running");
-     }
--    this.zkCluster = new MiniZooKeeperCluster();
--    this.zkPort = this.zkCluster.startup(zkDataDir);
--    this.zkClient =
--        CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort)
--            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
--    this.zkClient.start();
-+    zkCluster = new MiniZooKeeperCluster(sslEnabled);
-+    zkPort = zkCluster.startup(zkDataDir);
-+    zkSslEnabled = sslEnabled;
-+  }
-+
-+  @AfterClass
-+  public static void tearDown() throws Exception{
-+    zkCluster.shutdown();
-+    zkCluster = null;
-   }
- 
-   @After
--  public void tearDown() throws Exception {
--    this.zkClient.close();
-+  public void closeTokenStore() throws Exception{
-     if (ts != null) {
-       ts.close();
-     }
--    this.zkCluster.shutdown();
--    this.zkCluster = null;
-   }
- 
-   private Configuration createConf(String zkPath) {
-     Configuration conf = new Configuration();
-     conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:"
--        + this.zkPort);
-+        + zkPort);
-     conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath);
-+    if(zkSslEnabled) {
-+      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
-+          System.getProperty("test.data.files") :
-+          (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
-+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION,
-+          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
-+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD,
-+          KEY_STORE_TRUST_STORE_PASSWORD);
-+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION,
-+          dataFileDir + File.separator + TRUST_STORE_NAME);
-+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD,
-+          KEY_STORE_TRUST_STORE_PASSWORD);
-+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, "true");
-+
-+    }
-     return conf;
-   }
- 
-@@ -98,6 +115,7 @@ public void testTokenStorage() throws Exception {
-     ts.setConf(conf);
-     ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
- 
-+    CuratorFramework zkClient = ts.getSession();
- 
-     String metastore_zk_path = ZK_PATH + ServerMode.METASTORE;
-     int keySeq = ts.addMasterKey("key1Data");
-@@ -112,6 +130,7 @@ public void testTokenStorage() throws Exception {
- 
-     ts.removeMasterKey(keySeq);
-     assertEquals("expected number keys", 1, ts.getMasterKeys().length);
-+    ts.removeMasterKey(keySeq2);
- 
-     // tokens
-     DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(
-@@ -189,6 +208,8 @@ public void testAclPositive() throws Exception {
-     ts = new ZooKeeperTokenStore();
-     ts.setConf(conf);
-     ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
-+
-+    CuratorFramework zkClient = ts.getSession();
-     List<ACL> acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE);
-     assertEquals(2, acl.size());
-   }
-diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
-index cc32a7e2b8..596c3d6fc1 100644
---- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
-+++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
-@@ -111,6 +111,10 @@ public static void startServices() throws Exception {
-     addToExpectedRestrictedMap("spark.home");
-     addToExpectedRestrictedMap("hive.privilege.synchronizer.interval");
-     addToExpectedRestrictedMap("hive.driver.parallel.compilation.global.limit");
-+    addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.location");
-+    addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.password");
-+    addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.location");
-+    addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.password");
-   }
- 
-   @AfterClass
-diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
-index bd5e811743..3322434cb6 100644
---- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
-+++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
-@@ -22,11 +22,10 @@
- import com.google.common.collect.Collections2;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
--import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
-+import org.apache.curator.framework.recipes.nodes.PersistentNode;
- import org.apache.curator.retry.RetryOneTime;
- import org.apache.curator.test.TestingServer;
- import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
--import org.apache.hadoop.hive.conf.HiveConf;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.KeeperException;
- import org.junit.After;
-@@ -165,9 +164,8 @@ private void publishConfsToZk(Map<String, String> confs, String uri) throws Exce
-     // Publish configs for this instance as the data on the node
-     znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confs);
-     byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
--    PersistentEphemeralNode znode =
--      new PersistentEphemeralNode(client,
--        PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
-+    PersistentNode znode = new PersistentNode(client, CreateMode.EPHEMERAL_SEQUENTIAL,
-+        false, pathPrefix, znodeDataUTF8);
-     znode.start();
-     // We'll wait for 120s for node creation
-     long znodeCreationTimeout = 120;
-diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
-similarity index 95%
-rename from itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
-rename to itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
-index de2e4937a8..7302e0993a 100644
---- itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
-+++ itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
-@@ -19,6 +19,7 @@
- package org.apache.hive.service.server;
- 
- import java.io.File;
-+import java.io.IOException;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.Iterator;
-@@ -54,14 +55,14 @@
- import org.apache.hive.service.cli.OperationHandle;
- import org.apache.hive.service.cli.RowSet;
- import org.apache.hive.service.cli.SessionHandle;
-+import org.junit.AfterClass;
- import org.junit.Assert;
--import org.junit.BeforeClass;
- import org.junit.Test;
- 
- /**
-  * Test restricted information schema with privilege synchronization
-  */
--public class TestInformationSchemaWithPrivilege {
-+public abstract class InformationSchemaWithPrivilegeTestBase {
- 
-   // Group mapping:
-   // group_a: user1, user2
-@@ -175,14 +176,18 @@ public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory metastoreC
-     }
-   }
- 
-+  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
-+  private static final String TRUST_STORE_NAME = "truststore.jks";
-+  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
-+
-   private static MiniHS2 miniHS2 = null;
-   private static MiniZooKeeperCluster zkCluster = null;
-   private static Map<String, String> confOverlay;
- 
--  @BeforeClass
--  public static void beforeTest() throws Exception {
-+
-+  public static void setupInternal(boolean zookeeperSSLEnabled) throws Exception {
-     File zkDataDir = new File(System.getProperty("test.tmp.dir"));
--    zkCluster = new MiniZooKeeperCluster();
-+    zkCluster = new MiniZooKeeperCluster(zookeeperSSLEnabled);
-     int zkPort = zkCluster.startup(zkDataDir);
- 
-     miniHS2 = new MiniHS2(new HiveConf());
-@@ -206,9 +211,34 @@ public static void beforeTest() throws Exception {
-     confOverlay.put(ConfVars.HIVE_AUTHENTICATOR_MANAGER.varname, FakeGroupAuthenticator.class.getName());
-     confOverlay.put(ConfVars.HIVE_AUTHORIZATION_ENABLED.varname, "true");
-     confOverlay.put(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST.varname, ".*");
-+
-+    if(zookeeperSSLEnabled) {
-+      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
-+          System.getProperty("test.data.files") :
-+          (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
-+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION.varname,
-+          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
-+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
-+          KEY_STORE_TRUST_STORE_PASSWORD);
-+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION.varname,
-+          dataFileDir + File.separator + TRUST_STORE_NAME);
-+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
-+          KEY_STORE_TRUST_STORE_PASSWORD);
-+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE.varname, "true");
-+    }
-     miniHS2.start(confOverlay);
-   }
- 
-+  @AfterClass
-+  public static void tearDown() throws IOException {
-+    if (miniHS2 != null) {
-+      miniHS2.stop();
-+    }
-+    if (zkCluster != null) {
-+      zkCluster.shutdown();
-+    }
-+  }
-+
-   @Test
-   public void test() throws Exception {
- 
-diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
-new file mode 100644
-index 0000000000..ffa1843ae2
---- /dev/null
-+++ itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
-@@ -0,0 +1,35 @@
-+/*
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+package org.apache.hive.service.server;
-+
-+import org.junit.BeforeClass;
-+
-+/**
-+ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication disabled.
-+ */
-+public class TestInformationSchemaWithPrivilegeZookeeperPlain extends InformationSchemaWithPrivilegeTestBase {
-+
-+  public TestInformationSchemaWithPrivilegeZookeeperPlain() {
-+    super();
-+  }
-+
-+  @BeforeClass
-+  public static void setUp() throws Exception{
-+    setupInternal(false);
-+  }
-+}
-diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
-new file mode 100644
-index 0000000000..e12f4948d4
---- /dev/null
-+++ itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
-@@ -0,0 +1,35 @@
-+/*
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+package org.apache.hive.service.server;
-+
-+import org.junit.BeforeClass;
-+
-+/**
-+ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication enabled.
-+ */
-+public class TestInformationSchemaWithPrivilegeZookeeperSSL extends InformationSchemaWithPrivilegeTestBase {
-+
-+  public TestInformationSchemaWithPrivilegeZookeeperSSL() {
-+    super();
-+  }
-+
-+  @BeforeClass
-+  public static void setUp() throws Exception{
-+    setupInternal(true);
-+  }
-+}
-diff --git jdbc/src/java/org/apache/hive/jdbc/Utils.java jdbc/src/java/org/apache/hive/jdbc/Utils.java
-index e23826eb56..6cb6853077 100644
---- jdbc/src/java/org/apache/hive/jdbc/Utils.java
-+++ jdbc/src/java/org/apache/hive/jdbc/Utils.java
-@@ -116,6 +116,11 @@
-     public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
-     public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA";
-     public static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
-+    public static final String ZOOKEEPER_SSL_ENABLE = "zooKeeperSSLEnable";
-+    public static final String ZOOKEEPER_KEYSTORE_LOCATION = "zooKeeperKeystoreLocation";
-+    public static final String ZOOKEEPER_KEYSTORE_PASSWORD= "zooKeeperKeystorePassword";
-+    public static final String ZOOKEEPER_TRUSTSTORE_LOCATION  = "zooKeeperTruststoreLocation";
-+    public static final String ZOOKEEPER_TRUSTSTORE_PASSWORD = "zooKeeperTruststorePassword";
-     // Default namespace value on ZooKeeper.
-     // This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
-     static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
-@@ -168,6 +173,11 @@
-     private boolean isEmbeddedMode = false;
-     private String suppliedURLAuthority;
-     private String zooKeeperEnsemble = null;
-+    private boolean zooKeeperSslEnabled = false;
-+    private String zookeeperKeyStoreLocation = "";
-+    private String zookeeperKeyStorePassword = "";
-+    private String zookeeperTrustStoreLocation = "";
-+    private String zookeeperTrustStorePassword = "";
-     private String currentHostZnodePath;
-     private final List<String> rejectedHostZnodePaths = new ArrayList<String>();
- 
-@@ -185,6 +195,12 @@ public JdbcConnectionParams(JdbcConnectionParams params) {
-       this.isEmbeddedMode = params.isEmbeddedMode;
-       this.suppliedURLAuthority = params.suppliedURLAuthority;
-       this.zooKeeperEnsemble = params.zooKeeperEnsemble;
-+      this.zooKeeperSslEnabled = params.zooKeeperSslEnabled;
-+      this.zookeeperKeyStoreLocation = params.zookeeperKeyStoreLocation;
-+      this.zookeeperKeyStorePassword = params.zookeeperKeyStorePassword;
-+      this.zookeeperTrustStoreLocation = params.zookeeperTrustStoreLocation;
-+      this.zookeeperTrustStorePassword = params.zookeeperTrustStorePassword;
-+
-       this.currentHostZnodePath = params.currentHostZnodePath;
-       this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths);
-     }
-@@ -228,6 +244,25 @@ public String getSuppliedURLAuthority() {
-     public String getZooKeeperEnsemble() {
-       return zooKeeperEnsemble;
-     }
-+    public boolean isZooKeeperSslEnabled() {
-+      return zooKeeperSslEnabled;
-+    }
-+
-+    public String getZookeeperKeyStoreLocation() {
-+      return zookeeperKeyStoreLocation;
-+    }
-+
-+    public String getZookeeperKeyStorePassword() {
-+      return zookeeperKeyStorePassword;
-+    }
-+
-+    public String getZookeeperTrustStoreLocation() {
-+      return zookeeperTrustStoreLocation;
-+    }
-+
-+    public String getZookeeperTrustStorePassword() {
-+      return zookeeperTrustStorePassword;
-+    }
- 
-     public List<String> getRejectedHostZnodePaths() {
-       return rejectedHostZnodePaths;
-@@ -277,6 +312,26 @@ public void setZooKeeperEnsemble(String zooKeeperEnsemble) {
-       this.zooKeeperEnsemble = zooKeeperEnsemble;
-     }
- 
-+    public void setZooKeeperSslEnabled(boolean zooKeeperSslEnabled) {
-+      this.zooKeeperSslEnabled = zooKeeperSslEnabled;
-+    }
-+
-+    public void setZookeeperKeyStoreLocation(String zookeeperKeyStoreLocation) {
-+      this.zookeeperKeyStoreLocation = zookeeperKeyStoreLocation;
-+    }
-+
-+    public void setZookeeperKeyStorePassword(String zookeeperKeyStorePassword) {
-+      this.zookeeperKeyStorePassword = zookeeperKeyStorePassword;
-+    }
-+
-+    public void setZookeeperTrustStoreLocation(String zookeeperTrustStoreLocation) {
-+      this.zookeeperTrustStoreLocation = zookeeperTrustStoreLocation;
-+    }
-+
-+    public void setZookeeperTrustStorePassword(String zookeeperTrustStorePassword) {
-+      this.zookeeperTrustStorePassword = zookeeperTrustStorePassword;
-+    }
-+
-     public void setCurrentHostZnodePath(String currentHostZnodePath) {
-       this.currentHostZnodePath = currentHostZnodePath;
-     }
-@@ -485,6 +540,7 @@ public static JdbcConnectionParams extractURLComponents(String uri, Properties i
-         uri = uri.replace(dummyAuthorityString, authorityStr);
-         // Set ZooKeeper ensemble in connParams for later use
-         connParams.setZooKeeperEnsemble(authorityStr);
-+        ZooKeeperHiveClientHelper.setZkSSLParams(connParams);
-       } else {
-         URI jdbcBaseURI = URI.create(URI_HIVE_PREFIX + "//" + authorityStr);
-         // Check to prevent unintentional use of embedded mode. A missing "/"
-@@ -576,7 +632,6 @@ private static void handleParamDeprecation(Map<String, String> fromMap, Map<Stri
-    * host:port pairs.
-    *
-    * @param uri
--   * @param connParams
-    * @return
-    * @throws JdbcUriParseException
-    */
-diff --git jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
-index 759ba8a5ef..3d89fa223a 100644
---- jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
-+++ jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
-@@ -32,6 +32,7 @@
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.curator.utils.ZKPaths;
- import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.hive.common.SSLZookeeperFactory;
- import org.apache.hadoop.hive.conf.HiveConf;
- import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
- import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
-@@ -85,11 +86,40 @@ public static boolean isZkDynamicDiscoveryMode(Map<String, String> sessionConf)
-       JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode));
-   }
- 
-+  /**
-+   * Parse and set up the SSL communication related Zookeeper params in connParams from sessionVars.
-+   * @param connParams
-+   */
-+  public static void setZkSSLParams(JdbcConnectionParams connParams) {
-+    Map<String, String> sessionConf = connParams.getSessionVars();
-+    boolean sslEnabled = false;
-+    if (sessionConf.containsKey(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE)) {
-+      sslEnabled = Boolean.parseBoolean(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE));
-+      connParams.setZooKeeperSslEnabled(sslEnabled);
-+    }
-+    if (sslEnabled) {
-+      connParams.setZookeeperKeyStoreLocation(
-+          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_LOCATION), ""));
-+      connParams.setZookeeperKeyStorePassword(
-+          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_PASSWORD), ""));
-+      connParams.setZookeeperTrustStoreLocation(
-+          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_LOCATION), ""));
-+      connParams.setZookeeperTrustStorePassword(
-+          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_PASSWORD), ""));
-+    }
-+  }
-+
-   private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception {
-     String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
-     CuratorFramework zooKeeperClient =
--        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
--            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
-+        CuratorFrameworkFactory.builder()
-+            .connectString(zooKeeperEnsemble)
-+            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
-+            .zookeeperFactory(
-+            new SSLZookeeperFactory(connParams.isZooKeeperSslEnabled(), connParams.getZookeeperKeyStoreLocation(),
-+                connParams.getZookeeperKeyStorePassword(), connParams.getZookeeperTrustStoreLocation(),
-+                connParams.getZookeeperTrustStorePassword()))
-+            .build();
-     zooKeeperClient.start();
-     return zooKeeperClient;
-   }
-diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
-index d28fd1778c..2b21baaea4 100644
---- llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
-+++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
-@@ -30,20 +30,18 @@
- import java.util.concurrent.locks.ReentrantLock;
- 
- import org.apache.curator.framework.CuratorFramework;
--import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.api.ACLProvider;
- import org.apache.curator.framework.imps.CuratorFrameworkState;
- import org.apache.curator.framework.recipes.cache.ChildData;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
--import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
--import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
-+import org.apache.curator.framework.recipes.nodes.PersistentNode;
- import org.apache.curator.framework.state.ConnectionState;
- import org.apache.curator.framework.state.ConnectionStateListener;
--import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.curator.utils.CloseableUtils;
- import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
- import org.apache.hadoop.hive.conf.HiveConf;
- import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
- import org.apache.hadoop.hive.llap.LlapUtil;
-@@ -55,6 +53,7 @@
- import org.apache.hadoop.registry.client.types.ServiceRecord;
- import org.apache.hadoop.security.UserGroupInformation;
- import org.apache.hadoop.yarn.conf.YarnConfiguration;
-+import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.KeeperException.InvalidACLException;
- import org.apache.zookeeper.KeeperException.NodeExistsException;
- import org.apache.zookeeper.ZooDefs;
-@@ -111,7 +110,7 @@
-   private final Map<String, Set<InstanceType>> nodeToInstanceCache;
- 
-   // The registration znode.
--  private PersistentEphemeralNode znode;
-+  private PersistentNode znode;
-   private String znodePath; // unique identity for this instance
- 
-   private PathChildrenCache instancesCache; // Created on demand.
-@@ -212,28 +211,23 @@ private ACLProvider getACLProviderForZKPath(String zkPath) {
-   }
- 
-   private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
--    String zkEnsemble = getQuorumServers(conf);
--    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
--      ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
--    int connectionTimeout = (int) HiveConf.getTimeVar(conf,
--      ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
--    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
--      ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
--    int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
--
--    LOG.info("Creating curator client with connectString: {} sessionTimeoutMs: {} connectionTimeoutMs: {}" +
--      " namespace: {} exponentialBackoff - sleepTime: {} maxRetries: {}", zkEnsemble, sessionTimeout,
--      connectionTimeout, namespace, baseSleepTime, maxRetries);
--    // Create a CuratorFramework instance to be used as the ZooKeeper client
--    // Use the zooKeeperAclProvider to create appropriate ACLs
--    return CuratorFrameworkFactory.builder()
--      .connectString(zkEnsemble)
--      .sessionTimeoutMs(sessionTimeout)
--      .connectionTimeoutMs(connectionTimeout)
--      .aclProvider(zooKeeperAclProvider)
--      .namespace(namespace)
--      .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
--      .build();
-+    return ZooKeeperHiveHelper.builder()
-+        .quorum(conf.get(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname))
-+        .clientPort(conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
-+            ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()))
-+        .connectionTimeout(
-+            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
-+        .sessionTimeout(
-+            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS))
-+        .baseSleepTime(
-+            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS))
-+        .maxRetries(HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
-+        .sslEnabled(HiveConf.getBoolVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
-+        .keyStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
-+        .keyStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
-+        .trustStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
-+        .trustStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD))
-+        .build().getNewZookeeperClient(zooKeeperAclProvider, namespace);
-   }
- 
-   private static List<ACL> createSecureAcls() {
-@@ -283,9 +277,9 @@ protected final String registerServiceRecord(ServiceRecord srv, final String uni
- 
-     // Create a znode under the rootNamespace parent for this instance of the server
-     try {
--      // PersistentEphemeralNode will make sure the ephemeral node created on server will be present
-+      // PersistentNode will make sure the ephemeral node created on server will be present
-       // even under connection or session interruption (will automatically handle retries)
--      znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
-+      znode = new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false,
-           workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
-     
-       // start the creation of znodes
-diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
-index fa3a382367..e8eaac0fd9 100644
---- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
-+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
-@@ -18,14 +18,11 @@
- 
- package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
- 
--import java.util.concurrent.TimeUnit;
- 
- import org.apache.hive.common.util.ShutdownHookManager;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.apache.curator.framework.CuratorFramework;
--import org.apache.curator.framework.CuratorFrameworkFactory;
--import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.hadoop.hive.conf.HiveConf;
- 
- public class CuratorFrameworkSingleton {
-@@ -50,15 +47,8 @@ public static synchronized CuratorFramework getInstance(HiveConf hiveConf) {
-       } else {
-         conf = hiveConf;
-       }
--      int sessionTimeout =  (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
--      int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
--      int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
--      String quorumServers = conf.getZKConfig().getQuorumServers();
- 
--      sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers)
--          .sessionTimeoutMs(sessionTimeout)
--          .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
--          .build();
-+      sharedClient = conf.getZKConfig().getNewZookeeperClient();
-       sharedClient.start();
-     }
- 
-diff --git ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
-index eec628263a..dc11ae1611 100644
---- ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
-+++ ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
-@@ -34,7 +34,9 @@
- 
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HConstants;
--import org.apache.zookeeper.server.NIOServerCnxnFactory;
-+import org.apache.zookeeper.common.ClientX509Util;
-+import org.apache.zookeeper.common.X509Util;
-+import org.apache.zookeeper.server.ServerCnxnFactory;
- import org.apache.zookeeper.server.ZooKeeperServer;
- import org.apache.zookeeper.server.persistence.FileTxnLog;
- import org.slf4j.Logger;
-@@ -54,6 +56,9 @@
- 
-   private static final int TICK_TIME = 2000;
-   private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
-+  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
-+  private static final String TRUST_STORE_NAME = "truststore.jks";
-+  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
-   private int connectionTimeout;
- 
-   private boolean started;
-@@ -61,7 +66,7 @@
-   /** The default port. If zero, we use a random port. */
-   private int defaultClientPort = 0;
- 
--  private List<NIOServerCnxnFactory> standaloneServerFactoryList;
-+  private List<ServerCnxnFactory> standaloneServerFactoryList;
-   private List<ZooKeeperServer> zooKeeperServers;
-   private List<Integer> clientPortList;
- 
-@@ -70,11 +75,20 @@
- 
-   private Configuration configuration;
- 
-+  private boolean sslEnabled = false;
-+
-   public MiniZooKeeperCluster() {
-     this(new Configuration());
-   }
--
-+  public MiniZooKeeperCluster(boolean sslEnabled) {
-+    this(new Configuration(), sslEnabled);
-+  }
-   public MiniZooKeeperCluster(Configuration configuration) {
-+    this(configuration, false);
-+  }
-+
-+  public MiniZooKeeperCluster(Configuration configuration, boolean sslEnabled) {
-+    this.sslEnabled = sslEnabled;
-     this.started = false;
-     this.configuration = configuration;
-     activeZKServerIndex = -1;
-@@ -167,7 +181,7 @@ public int getZooKeeperServerNum() {
-   }
- 
-   // / XXX: From o.a.zk.t.ClientBase
--  private static void setupTestEnv() {
-+  private static void setupTestEnv(boolean sslEnabled) {
-     // With ZooKeeper 3.5 we need to whitelist the 4 letter commands we use
-     System.setProperty("zookeeper.4lw.commands.whitelist", "*");
- 
-@@ -176,6 +190,9 @@ private static void setupTestEnv() {
-     // resulting in test failure (client timeout on first session).
-     // set env and directly in order to handle static init/gc issues
-     System.setProperty("zookeeper.preAllocSize", "100");
-+    if (sslEnabled) {
-+      System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
-+    }
-     FileTxnLog.setPreallocSize(100 * 1024);
-   }
- 
-@@ -200,7 +217,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
-       return -1;
-     }
- 
--    setupTestEnv();
-+    setupTestEnv(sslEnabled);
-     shutdown();
- 
-     int tentativePort = -1; // the seed port
-@@ -229,12 +246,10 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
-       // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
-       server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
-       server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
--      NIOServerCnxnFactory standaloneServerFactory;
-+      ServerCnxnFactory standaloneServerFactory;
-       while (true) {
-         try {
--          standaloneServerFactory = new NIOServerCnxnFactory();
--          standaloneServerFactory.configure(new InetSocketAddress(currentClientPort),
--              configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
-+          standaloneServerFactory = createServerCnxnFactory(currentClientPort);
-         } catch (BindException e) {
-           LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
-           // We're told to use some port but it's occupied, fail
-@@ -252,7 +267,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
-       // Start up this ZK server
-       standaloneServerFactory.startup(server);
-       // Runs a 'stat' against the servers.
--      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
-+      if (!sslEnabled && !waitForServerUp(currentClientPort, connectionTimeout)) {
-         throw new IOException("Waiting for startup of standalone server");
-       }
- 
-@@ -276,6 +291,36 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
-     return clientPort;
-   }
- 
-+  private ServerCnxnFactory createServerCnxnFactory(int currentClientPort) throws IOException {
-+    ServerCnxnFactory serverCnxnFactory = null;
-+    if (sslEnabled) {
-+      System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
-+          "org.apache.zookeeper.server.NettyServerCnxnFactory");
-+      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
-+              System.getProperty("test.data.files") :
-+              configuration.get("test.data.files").replace('\\', '/').replace("c:", "");
-+      X509Util x509Util = new ClientX509Util();
-+      System.setProperty(x509Util.getSslKeystoreLocationProperty(),
-+          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
-+      System.setProperty(x509Util.getSslKeystorePasswdProperty(),
-+          KEY_STORE_TRUST_STORE_PASSWORD);
-+      System.setProperty(x509Util.getSslTruststoreLocationProperty(),
-+          dataFileDir + File.separator + TRUST_STORE_NAME);
-+      System.setProperty(x509Util.getSslTruststorePasswdProperty(),
-+          KEY_STORE_TRUST_STORE_PASSWORD);
-+      serverCnxnFactory = ServerCnxnFactory.createFactory();
-+      serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
-+          configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS),
-+          true);
-+    } else {
-+      serverCnxnFactory = ServerCnxnFactory.createFactory();
-+      serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
-+          configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
-+    }
-+    return serverCnxnFactory;
-+  }
-+
-+
-   private void createDir(File dir) throws IOException {
-     try {
-       if (!dir.exists()) {
-@@ -292,7 +337,7 @@ private void createDir(File dir) throws IOException {
-   public void shutdown() throws IOException {
-     // shut down all the zk servers
-     for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
--      NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
-+      ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
-       int clientPort = clientPortList.get(i);
- 
-       standaloneServerFactory.shutdown();
-@@ -305,6 +350,7 @@ public void shutdown() throws IOException {
-     for (ZooKeeperServer zkServer : zooKeeperServers) {
-       //explicitly close ZKDatabase since ZookeeperServer does not close them
-       zkServer.getZKDatabase().close();
-+      zkServer.shutdown(true);
-     }
-     zooKeeperServers.clear();
- 
-@@ -328,7 +374,7 @@ public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedExc
-     }
- 
-     // Shutdown the current active one
--    NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
-+    ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
-     int clientPort = clientPortList.get(activeZKServerIndex);
- 
-     standaloneServerFactory.shutdown();
-@@ -366,7 +412,7 @@ public void killOneBackupZooKeeperServer() throws IOException, InterruptedExcept
- 
-     int backupZKServerIndex = activeZKServerIndex + 1;
-     // Shutdown the current active one
--    NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
-+    ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
-     int clientPort = clientPortList.get(backupZKServerIndex);
- 
-     standaloneServerFactory.shutdown();
-diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java
-index fece82e266..181ea5d6d5 100644
---- service/src/java/org/apache/hive/service/server/HiveServer2.java
-+++ service/src/java/org/apache/hive/service/server/HiveServer2.java
-@@ -43,14 +43,12 @@
- import org.apache.commons.lang3.StringUtils;
- import org.apache.commons.lang3.concurrent.BasicThreadFactory;
- import org.apache.curator.framework.CuratorFramework;
--import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.api.ACLProvider;
- import org.apache.curator.framework.api.BackgroundCallback;
- import org.apache.curator.framework.api.CuratorEvent;
- import org.apache.curator.framework.api.CuratorEventType;
- import org.apache.curator.framework.recipes.leader.LeaderLatch;
- import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
--import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hive.common.JvmPauseMonitor;
- import org.apache.hadoop.hive.common.LogUtils;
-@@ -1078,14 +1076,9 @@ private void maybeStartCompactorThreads(HiveConf hiveConf) throws Exception {
-    */
-   static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
-     HiveConf hiveConf = new HiveConf();
--    String zooKeeperEnsemble = hiveConf.getZKConfig().getQuorumServers();
--    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
--    int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
--    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
--    CuratorFramework zooKeeperClient =
--        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
--            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
-+    CuratorFramework zooKeeperClient = hiveConf.getZKConfig().getNewZookeeperClient();
-     zooKeeperClient.start();
-+    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
-     List<String> znodePaths =
-         zooKeeperClient.getChildren().forPath(
-             ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
-diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
-new file mode 100644
-index 0000000000..ee01731fa9
---- /dev/null
-+++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
-@@ -0,0 +1,78 @@
-+/*
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+
-+package org.apache.hadoop.hive.common;
-+
-+import org.apache.commons.lang3.StringUtils;
-+import org.apache.curator.utils.ZookeeperFactory;
-+import org.apache.zookeeper.Watcher;
-+import org.apache.zookeeper.ZooKeeper;
-+import org.apache.zookeeper.client.ZKClientConfig;
-+import org.apache.zookeeper.common.ClientX509Util;
-+import org.slf4j.Logger;
-+import org.slf4j.LoggerFactory;
-+
-+/**
-+ * Factory to create Zookeeper clients with the zookeeper.client.secure enabled,
-+ * allowing SSL communication with the Zookeeper server.
-+ */
-+public class SSLZookeeperFactory implements ZookeeperFactory {
-+
-+  private static final Logger LOG = LoggerFactory.getLogger(SSLZookeeperFactory.class);
-+
-+  private boolean sslEnabled;
-+  private String keyStoreLocation;
-+  private String keyStorePassword;
-+  private String trustStoreLocation;
-+  private String trustStorePassword;
-+
-+  public SSLZookeeperFactory(boolean sslEnabled, String keyStoreLocation, String keyStorePassword,
-+      String trustStoreLocation, String trustStorePassword) {
-+
-+    this.sslEnabled = sslEnabled;
-+    this.keyStoreLocation = keyStoreLocation;
-+    this.keyStorePassword = keyStorePassword;
-+    this.trustStoreLocation = trustStoreLocation;
-+    this.trustStorePassword = trustStorePassword;
-+    if (sslEnabled) {
-+      if (StringUtils.isEmpty(keyStoreLocation)) {
-+        LOG.warn("Missing keystoreLocation parameter");
-+      }
-+      if (StringUtils.isEmpty(trustStoreLocation)) {
-+        LOG.warn("Missing trustStoreLocation parameter");
-+      }
-+    }
-+  }
-+
-+  @Override
-+  public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
-+      boolean canBeReadOnly) throws Exception {
-+    if (!this.sslEnabled) {
-+      return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-+    }
-+    ZKClientConfig clientConfig = new ZKClientConfig();
-+    clientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
-+    clientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
-+    ClientX509Util x509Util = new ClientX509Util();
-+    clientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(), this.keyStoreLocation);
-+    clientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(), this.keyStorePassword);
-+    clientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(), this.trustStoreLocation);
-+    clientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(), this.trustStorePassword);
-+    return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, clientConfig);
-+  }
-+}
-diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
-index 99f7c97877..71d8651712 100644
---- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
-+++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
-@@ -23,10 +23,11 @@
- import java.util.ArrayList;
- import java.util.concurrent.TimeUnit;
- import java.util.List;
-+
- import org.apache.curator.framework.api.ACLProvider;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.CuratorFramework;
--import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
-+import org.apache.curator.framework.recipes.nodes.PersistentNode;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.KeeperException;
-@@ -55,28 +56,164 @@
-   public static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHiveHelper.class.getName());
-   public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
- 
--  private String quorum = null;
--  private String clientPort = null;
--  private String rootNamespace = null;
--  private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
-+  /**
-+   * ZooKeeperHiveHelperBuilder. A builder class to initialize ZooKeeperHiveHelper.
-+   */
-+  public static class ZooKeeperHiveHelperBuilder {
-+    private String quorum = null;
-+    private String clientPort = null;
-+    private String serverRegistryNameSpace = null;
-+    private int connectionTimeout;
-+    private int sessionTimeout;
-+    private int baseSleepTime;
-+    private int maxRetries;
-+    private boolean sslEnabled = false;
-+    private String keyStoreLocation = null;
-+    private String keyStorePassword = null;
-+    private String trustStoreLocation = null;
-+    private String trustStorePassword = null;
-+
-+    public ZooKeeperHiveHelper build() {
-+      return new ZooKeeperHiveHelper(this);
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder quorum(String quorum) {
-+      this.quorum = quorum;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder clientPort(String clientPort) {
-+      this.clientPort = clientPort;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder serverRegistryNameSpace(String serverRegistryNameSpace) {
-+      this.serverRegistryNameSpace = serverRegistryNameSpace;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder connectionTimeout(int connectionTimeout) {
-+      this.connectionTimeout = connectionTimeout;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder sessionTimeout(int sessionTimeout) {
-+      this.sessionTimeout = sessionTimeout;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder baseSleepTime(int baseSleepTime) {
-+      this.baseSleepTime = baseSleepTime;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder maxRetries(int maxRetries) {
-+      this.maxRetries = maxRetries;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder sslEnabled(boolean sslEnabled) {
-+      this.sslEnabled = sslEnabled;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder keyStoreLocation(String keyStoreLocation) {
-+      this.keyStoreLocation = keyStoreLocation;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder keyStorePassword(String keyStorePassword) {
-+      this.keyStorePassword = keyStorePassword;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder trustStoreLocation(String trustStoreLocation) {
-+      this.trustStoreLocation = trustStoreLocation;
-+      return this;
-+    }
-+
-+    public ZooKeeperHiveHelperBuilder trustStorePassword(String trustStorePassword) {
-+      this.trustStorePassword = trustStorePassword;
-+      return this;
-+    }
-+
-+    public String getQuorum() {
-+      return quorum;
-+    }
-+
-+    public String getClientPort() {
-+      return clientPort;
-+    }
-+
-+    public String getServerRegistryNameSpace() {
-+      return serverRegistryNameSpace;
-+    }
-+
-+    public int getConnectionTimeout() {
-+      return connectionTimeout;
-+    }
-+
-+    public int getSessionTimeout() {
-+      return sessionTimeout;
-+    }
-+
-+    public int getBaseSleepTime() {
-+      return baseSleepTime;
-+    }
-+
-+    public int getMaxRetries() {
-+      return maxRetries;
-+    }
-+
-+    public boolean isSslEnabled() {
-+      return sslEnabled;
-+    }
-+
-+    public String getKeyStoreLocation() {
-+      return keyStoreLocation;
-+    }
-+
-+    public String getKeyStorePassword() {
-+      return keyStorePassword;
-+    }
-+
-+    public String getTrustStoreLocation() {
-+      return trustStoreLocation;
-+    }
-+
-+    public String getTrustStorePassword() {
-+      return trustStorePassword;
-+    }
-+  }
-+
-+  public static ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder builder() {
-+    return new ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder();
-+  }
-+
-+  private String quorum;
-+  private String rootNamespace;
-+  private int connectionTimeout;
-   private int sessionTimeout;
-   private int baseSleepTime;
-   private int maxRetries;
-+  private boolean sslEnabled;
- 
-+  private SSLZookeeperFactory sslZookeeperFactory;
-   private CuratorFramework zooKeeperClient;
--  private PersistentEphemeralNode znode;
-+  private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
-+  private PersistentNode znode;
-+
- 
--  public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespace,
--                             int sessionTimeout, int baseSleepTime, int maxRetries) {
-+  public ZooKeeperHiveHelper(ZooKeeperHiveHelperBuilder builder) {
-     // Get the ensemble server addresses in the format host1:port1, host2:port2, ... . Append
-     // the configured port to hostname if the hostname doesn't contain a port.
--    String[] hosts = quorum.split(",");
-+    String[] hosts = builder.getQuorum().split(",");
-     StringBuilder quorumServers = new StringBuilder();
-     for (int i = 0; i < hosts.length; i++) {
-       quorumServers.append(hosts[i].trim());
-       if (!hosts[i].contains(":")) {
-         quorumServers.append(":");
--        quorumServers.append(clientPort);
-+        quorumServers.append(builder.getClientPort());
-       }
- 
-       if (i != hosts.length - 1) {
-@@ -85,11 +222,19 @@ public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespac
-     }
- 
-     this.quorum = quorumServers.toString();
--    this.clientPort = clientPort;
--    this.rootNamespace = rootNamespace;
--    this.sessionTimeout = sessionTimeout;
--    this.baseSleepTime = baseSleepTime;
--    this.maxRetries = maxRetries;
-+    this.rootNamespace = builder.getServerRegistryNameSpace();
-+    this.connectionTimeout = builder.getConnectionTimeout();
-+    this.sessionTimeout = builder.getSessionTimeout();
-+    this.baseSleepTime = builder.getBaseSleepTime();
-+    this.maxRetries = builder.getMaxRetries();
-+    this.sslEnabled = builder.isSslEnabled();
-+    this.sslZookeeperFactory =
-+        new SSLZookeeperFactory(sslEnabled,
-+            builder.getKeyStoreLocation(),
-+            builder.getKeyStorePassword(),
-+            builder.getTrustStoreLocation(),
-+            builder.getTrustStorePassword());
-+
-   }
- 
-   /**
-@@ -118,8 +263,7 @@ public void addServerInstanceToZooKeeper(String znodePathPrefix, String znodeDat
-                       + ZOOKEEPER_PATH_SEPARATOR + znodePathPrefix;
-       byte[] znodeDataUTF8 = znodeData.getBytes(StandardCharsets.UTF_8);
-       znode =
--              new PersistentEphemeralNode(zooKeeperClient,
--                      PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
-+              new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false, pathPrefix, znodeDataUTF8);
-       znode.start();
-       // We'll wait for 120s for node creation
-       long znodeCreationTimeout = 120;
-@@ -147,17 +291,7 @@ public void addServerInstanceToZooKeeper(String znodePathPrefix, String znodeDat
- 
-   public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
-                                                boolean addParentNode) throws Exception {
--    String zooKeeperEnsemble = getQuorumServers();
--    // Create a CuratorFramework instance to be used as the ZooKeeper client.
--    // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
--    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
--            .connectString(zooKeeperEnsemble)
--            .sessionTimeoutMs(sessionTimeout)
--            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
--    if (zooKeeperAclProvider != null) {
--      builder = builder.aclProvider(zooKeeperAclProvider);
--    }
--    CuratorFramework zkClient = builder.build();
-+    CuratorFramework zkClient = getNewZookeeperClient(zooKeeperAclProvider);
-     zkClient.start();
- 
-     // Create the parent znodes recursively; ignore if the parent already exists.
-@@ -177,6 +311,39 @@ public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
-     }
-     return zkClient;
-   }
-+  public CuratorFramework getNewZookeeperClient() {
-+    return getNewZookeeperClient(null, null);
-+  }
-+  public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider) {
-+    return getNewZookeeperClient(zooKeeperAclProvider, null);
-+  }
-+
-+  public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider, String nameSpace) {
-+    LOG.info("Creating curator client with connectString: {} namespace: {} sessionTimeoutMs: {}" +
-+            " connectionTimeoutMs: {} exponentialBackoff - sleepTime: {} maxRetries: {} sslEnabled: {}",
-+        quorum, nameSpace,  sessionTimeout,
-+        connectionTimeout, baseSleepTime, maxRetries, sslEnabled);
-+    // Create a CuratorFramework instance to be used as the ZooKeeper client.
-+    // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
-+    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-+            .connectString(quorum)
-+            .namespace(nameSpace)
-+            .zookeeperFactory(this.sslZookeeperFactory);
-+    if (connectionTimeout > 0) {
-+      builder = builder.connectionTimeoutMs(connectionTimeout);
-+    }
-+    if (sessionTimeout > 0) {
-+      builder = builder.sessionTimeoutMs(sessionTimeout);
-+    }
-+    if (maxRetries > 0) {
-+      builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
-+    }
-+    if (zooKeeperAclProvider != null) {
-+      builder = builder.aclProvider(zooKeeperAclProvider);
-+    }
-+
-+    return builder.build();
-+  }
- 
-   public void removeServerInstanceFromZooKeeper() throws Exception {
-     setDeregisteredWithZooKeeper(true);
-@@ -185,7 +352,9 @@ public void removeServerInstanceFromZooKeeper() throws Exception {
-       znode.close();
-       znode = null;
-     }
--    zooKeeperClient.close();
-+    if (zooKeeperClient != null) {
-+      zooKeeperClient.close();
-+    }
-     LOG.info("Server instance removed from ZooKeeper.");
-   }
- 
-diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
-index fc6a2fd43a..d6a6c96a6a 100644
---- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
-+++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
-@@ -258,7 +258,11 @@ public String toString() {
-       ConfVars.SSL_TRUSTSTORE_PASSWORD.varname,
-       ConfVars.SSL_TRUSTSTORE_PASSWORD.hiveName,
-       ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.varname,
--      ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName
-+      ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName,
-+      ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
-+      ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.hiveName,
-+      ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
-+      ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.hiveName
-   );
- 
-   public static ConfVars getMetaConf(String name) {
-@@ -1072,32 +1076,56 @@ public static ConfVars getMetaConf(String name) {
-             "If ZooKeeper is configured for Kerberos authentication. This could be useful when cluster\n" +
-             "is kerberized, but Zookeeper is not."),
-     THRIFT_ZOOKEEPER_CLIENT_PORT("metastore.zookeeper.client.port",
--            "hive.metastore.zookeeper.client.port", "2181",
-+            "hive.zookeeper.client.port", "2181",
-             "The port of ZooKeeper servers to talk to.\n" +
-                     "If the list of Zookeeper servers specified in hive.metastore.thrift.uris" +
-                     " does not contain port numbers, this value is used."),
-     THRIFT_ZOOKEEPER_SESSION_TIMEOUT("metastore.zookeeper.session.timeout",
--            "hive.metastore.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
-+            "hive.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
-             new TimeValidator(TimeUnit.MILLISECONDS),
-             "ZooKeeper client's session timeout (in milliseconds). The client is disconnected\n" +
-                     "if a heartbeat is not sent in the timeout."),
-     THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT("metastore.zookeeper.connection.timeout",
--            "hive.metastore.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
-+            "hive.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
-             new TimeValidator(TimeUnit.SECONDS),
-             "ZooKeeper client's connection timeout in seconds. " +
-                     "Connection timeout * hive.metastore.zookeeper.connection.max.retries\n" +
-                     "with exponential backoff is when curator client deems connection is lost to zookeeper."),
-     THRIFT_ZOOKEEPER_NAMESPACE("metastore.zookeeper.namespace",
--            "hive.metastore.zookeeper.namespace", "hive_metastore",
-+            "hive.zookeeper.namespace", "hive_metastore",
-             "The parent node under which all ZooKeeper nodes for metastores are created."),
-     THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES("metastore.zookeeper.connection.max.retries",
--            "hive.metastore.zookeeper.connection.max.retries", 3,
-+            "hive.zookeeper.connection.max.retries", 3,
-             "Max number of times to retry when connecting to the ZooKeeper server."),
-     THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME("metastore.zookeeper.connection.basesleeptime",
--            "hive.metastore.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
-+            "hive.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
-             new TimeValidator(TimeUnit.MILLISECONDS),
-             "Initial amount of time (in milliseconds) to wait between retries\n" +
-                     "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
-+    THRIFT_ZOOKEEPER_SSL_ENABLE("metastore.zookeeper.ssl.client.enable",
-+        "hive.zookeeper.ssl.client.enable", false,
-+        "Set client to use TLS when connecting to ZooKeeper.  An explicit value overrides any value set via the " +
-+            "zookeeper.client.secure system property (note the different name).  Defaults to false if neither is set."),
-+    THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION("metastore.zookeeper.ssl.keystore.location",
-+        "hive.zookeeper.ssl.keystore.location", "",
-+        "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
-+            "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
-+            "system property (note the camelCase)."),
-+    THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("metastore.zookeeper.ssl.keystore.password",
-+        "hive.zookeeper.ssl.keystore.password", "",
-+        "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
-+            "Overrides any explicit value set via the zookeeper.ssl.keyStore.password" +
-+            "system property (note the camelCase)."),
-+    THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("metastore.zookeeper.ssl.truststore.location",
-+        "hive.zookeeper.ssl.truststore.location", "",
-+        "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
-+            "Overrides any explicit value set via the zookeeper.ssl.trustStore.location " +
-+            "system property (note the camelCase)."),
-+    THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("metastore.zookeeper.ssl.truststore.password",
-+        "hive.zookeeper.ssl.truststore.password", "",
-+        "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
-+            "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
-+            "system property (note the camelCase)."),
-     THRIFT_URI_SELECTION("metastore.thrift.uri.selection", "hive.metastore.uri.selection", "RANDOM",
-         new StringSetValidator("RANDOM", "SEQUENTIAL"),
-         "Determines the selection mechanism used by metastore client to connect to remote " +
-@@ -2018,14 +2046,22 @@ public static boolean isEmbeddedMetaStore(String msUri) {
-   }
- 
-   public static ZooKeeperHiveHelper getZKConfig(Configuration conf) {
--    return new ZooKeeperHiveHelper(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS),
--            MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT),
--            MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE),
--            (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
--                    TimeUnit.MILLISECONDS),
--            (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
--                    TimeUnit.MILLISECONDS),
--            MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES));
-+    return ZooKeeperHiveHelper.builder()
-+        .quorum(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS))
-+        .clientPort(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT))
-+        .serverRegistryNameSpace(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE))
-+        .connectionTimeout((int) getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT,
-+            TimeUnit.MILLISECONDS))
-+        .sessionTimeout((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
-+            TimeUnit.MILLISECONDS))
-+        .baseSleepTime((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
-+            TimeUnit.MILLISECONDS))
-+        .maxRetries(MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES))
-+        .sslEnabled(MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_ENABLE))
-+        .keyStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
-+        .keyStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
-+        .trustStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
-+        .trustStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
-   }
- 
-   /**
-diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
-index b35dc7ce4b..239bff6dc9 100644
---- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
-+++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
-@@ -46,6 +46,16 @@
-   public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
-       "hive.cluster.delegation.token.store.zookeeper.acl";
-   public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hivedelegation";
-+  public static final String DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE =
-+      "hive.cluster.delegation.token.store.zookeeper.ssl.client.enable";
-+  public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION =
-+      "hive.cluster.delegation.token.store.zookeeper.keystore.location";
-+  public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD =
-+      "hive.cluster.delegation.token.store.zookeeper.keystore.password";
-+  public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION =
-+      "hive.cluster.delegation.token.store.zookeeper.truststore.location";
-+  public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD =
-+      "hive.cluster.delegation.token.store.zookeeper.truststore.password";
- 
-   public MetastoreDelegationTokenManager() {
-   }
-diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
-index af52fcc5f6..dd2af7ef1f 100644
---- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
-+++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
-@@ -29,8 +29,8 @@
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.api.ACLProvider;
- import org.apache.curator.framework.imps.CuratorFrameworkState;
--import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
- import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
- import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
- import org.apache.hadoop.security.UserGroupInformation;
-@@ -57,12 +57,22 @@
-   protected static final String ZK_SEQ_FORMAT = "%010d";
-   private static final String NODE_KEYS = "/keys";
-   private static final String NODE_TOKENS = "/tokens";
-+  private static final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
-+      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
- 
-   private String rootNode = "";
-   private volatile CuratorFramework zkSession;
-   private String zkConnectString;
-   private int connectTimeoutMillis;
-+  private boolean sslEnabled;
-+  private String keyStoreLocation;
-+  private String keyStorePassword;
-+  private String trustStoreLocation;
-+  private String trustStorePassword;
-+
-   private List<ACL> newNodeAcl;
-+  private Configuration conf;
-+  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
- 
-   /**
-    * ACLProvider permissions will be used in case parent dirs need to be created
-@@ -110,12 +120,7 @@ private boolean isKerberosEnabled(Configuration conf) {
-     }
-   }
- 
--  private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
--      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
--
--  private Configuration conf;
- 
--  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
- 
-   /**
-    * Default constructor for dynamic instantiation w/ Configurable
-@@ -124,14 +129,22 @@ private boolean isKerberosEnabled(Configuration conf) {
-   protected ZooKeeperTokenStore() {
-   }
- 
--  private CuratorFramework getSession() {
-+  public CuratorFramework getSession() {
-     if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
-       synchronized (this) {
-         if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
--          zkSession =
--              CuratorFrameworkFactory.builder().connectString(zkConnectString)
--                  .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
--                  .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
-+          ZooKeeperHiveHelper zkHelper = ZooKeeperHiveHelper.builder()
-+              .quorum(zkConnectString)
-+              .connectionTimeout(connectTimeoutMillis)
-+              .maxRetries(3)
-+              .baseSleepTime(1000)
-+              .sslEnabled(sslEnabled)
-+              .keyStoreLocation(keyStoreLocation)
-+              .keyStorePassword(keyStorePassword)
-+              .trustStoreLocation(trustStoreLocation)
-+              .trustStorePassword(trustStorePassword)
-+              .build();
-+          zkSession = zkHelper.getNewZookeeperClient(aclDefaultProvider);
-           zkSession.start();
-         }
-       }
-@@ -478,10 +491,14 @@ public void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode sMo
-             + WHEN_ZK_DSTORE_MSG);
-       }
-     }
--    connectTimeoutMillis =
--        conf.getInt(
--            MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
--            CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
-+    connectTimeoutMillis = conf.getInt(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
-+        CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
-+
-+    sslEnabled = conf.getBoolean(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, false);
-+    keyStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION, "");
-+    keyStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD, "");
-+    trustStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION, "");
-+    trustStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD, "");
- 
-     String aclStr = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null);
-     this.newNodeAcl = StringUtils.isNotBlank(aclStr)? parseACLs(aclStr) : getDefaultAcl(conf);
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 34df01e..d50912b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2672,25 +2672,6 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.MILLISECONDS),
         "Initial amount of time (in milliseconds) to wait between retries\n" +
         "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
-    HIVE_ZOOKEEPER_SSL_ENABLE("hive.zookeeper.ssl.client.enable", false,
-        "Set client to use TLS when connecting to ZooKeeper.  An explicit value overrides any value set via the " +
-            "zookeeper.client.secure system property (note the different name).  Defaults to false if neither is set."),
-    HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION("hive.zookeeper.ssl.keystore.location", "",
-        "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
-            "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
-            "system property (note the camelCase)."),
-    HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("hive.zookeeper.ssl.keystore.password", "",
-        "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
-            "Overrides any explicit value set via the zookeeper.ssl.keyStore.password " +
-             "system property (note the camelCase)."),
-    HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("hive.zookeeper.ssl.truststore.location", "",
-        "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
-            "Overrides any explicit value set via the zookeeper.ssl.trustStore.location" +
-            "system property (note the camelCase)."),
-    HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("hive.zookeeper.ssl.truststore.password", "",
-        "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
-            "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
-             "system property (note the camelCase)."),
 
     // Transactions
     HIVE_TXN_MANAGER("hive.txn.manager",
@@ -4814,18 +4795,14 @@ public class HiveConf extends Configuration {
             "hive.spark.client.rpc.server.address," +
             "hive.spark.client.rpc.server.port," +
             "hive.spark.client.rpc.sasl.mechanisms," +
-            "bonecp.," +
-            "hive.druid.broker.address.default," +
-            "hive.druid.coordinator.address.default," +
-            "hikaricp.," +
-            "hadoop.bin.path," +
-            "yarn.bin.path," +
-            "spark.home," +
-            "hive.driver.parallel.compilation.global.limit," +
-            "hive.zookeeper.ssl.keystore.location," +
-            "hive.zookeeper.ssl.keystore.password," +
-            "hive.zookeeper.ssl.truststore.location," +
-            "hive.zookeeper.ssl.truststore.password",
+            "bonecp.,"+
+            "hive.druid.broker.address.default,"+
+            "hive.druid.coordinator.address.default,"+
+            "hikaricp.,"+
+            "hadoop.bin.path,"+
+            "yarn.bin.path,"+
+            "spark.home,"+
+            "hive.driver.parallel.compilation.global.limit",
         "Comma separated list of configuration options which are immutable at runtime"),
     HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
         METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname
@@ -4840,11 +4817,7 @@ public class HiveConf extends Configuration {
         + ",fs.s3a.proxy.password"
         + ",dfs.adls.oauth2.credential"
         + ",fs.adl.oauth2.credential"
-        + ",fs.azure.account.oauth2.client.secret"
-        + ",hive.zookeeper.ssl.keystore.location"
-        + ",hive.zookeeper.ssl.keystore.password"
-        + ",hive.zookeeper.ssl.truststore.location"
-        + ",hive.zookeeper.ssl.truststore.password",
+        + ",fs.azure.account.oauth2.client.secret",
         "Comma separated list of configuration options which should not be read by normal user like passwords"),
     HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
         "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
@@ -5646,22 +5619,14 @@ public class HiveConf extends Configuration {
    * given HiveConf.
    */
   public ZooKeeperHiveHelper getZKConfig() {
-    return ZooKeeperHiveHelper.builder()
-      .quorum(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM))
-      .clientPort(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT))
-      .serverRegistryNameSpace(getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE))
-      .connectionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
-          TimeUnit.MILLISECONDS))
-      .sessionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
-          TimeUnit.MILLISECONDS))
-      .baseSleepTime((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
-          TimeUnit.MILLISECONDS))
-      .maxRetries(getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
-      .sslEnabled(getBoolVar(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
-      .keyStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
-      .keyStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
-      .trustStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
-      .trustStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
+    return new ZooKeeperHiveHelper(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM),
+            getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT),
+            getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE),
+            (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+                    TimeUnit.MILLISECONDS),
+            (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+                    TimeUnit.MILLISECONDS),
+            getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES));
   }
 
   public HiveConf() {
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
index 02a8926..1fc8d36 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -50,12 +50,8 @@ public class ZooKeeperStorage implements TempletonStorage {
   public String overhead_path = null;
 
   public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
-  public static final String ZK_SESSION_TIMEOUT = "templeton.zookeeper.session-timeout";
-  public static final String ZK_SSL_ENABLE = "templeton.zookeeper.ssl.client.enable";
-  public static final String ZK_KEYSTORE_LOCATION = "templeton.zookeeper.keystore.location";
-  public static final String ZK_KEYSTORE_PASSWORD = "templeton.zookeeper.keystore.password";
-  public static final String ZK_TRUSTSTORE_LOCATION = "templeton.zookeeper.truststore.location";
-  public static final String ZK_TRUSTSTORE_PASSWORD = "templeton.zookeeper.truststore.password";
+  public static final String ZK_SESSION_TIMEOUT
+    = "templeton.zookeeper.session-timeout";
 
   public static final String ENCODING = "UTF-8";
 
@@ -63,27 +59,29 @@ public class ZooKeeperStorage implements TempletonStorage {
 
   private CuratorFramework zk;
 
-
   /**
    * Open a ZooKeeper connection for the JobState.
    */
-  public static CuratorFramework zkOpen(Configuration conf) throws IOException {
-    ZooKeeperHiveHelper xkHelper = ZooKeeperHiveHelper.builder()
-        .quorum(conf.get(ZK_HOSTS))
-        .sessionTimeout(conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()))
-        .baseSleepTime(1000)
-        .maxRetries(3)
-        .sslEnabled(conf.getBoolean(ZK_SSL_ENABLE, false))
-        .keyStoreLocation(conf.get(ZK_KEYSTORE_LOCATION, ""))
-        .keyStorePassword(conf.get(ZK_KEYSTORE_PASSWORD, ""))
-        .trustStoreLocation(conf.get(ZK_TRUSTSTORE_LOCATION, ""))
-        .trustStorePassword(conf.get(ZK_TRUSTSTORE_PASSWORD, ""))
-        .build();
-    CuratorFramework zk = xkHelper.getNewZookeeperClient();
+  public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
+    throws IOException {
+    //do we need to add a connection status listener?  What will that do?
+    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
+      CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
     zk.start();
     return zk;
   }
 
+  /**
+   * Open a ZooKeeper connection for the JobState.
+   */
+  public static CuratorFramework zkOpen(Configuration conf) throws IOException {
+    /*the silly looking call to Builder below is to get the default value of session timeout
+    from Curator which itself exposes it as system property*/
+    return zkOpen(conf.get(ZK_HOSTS),
+      conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
+  }
+
   public ZooKeeperStorage() {
     // No-op -- this is needed to be able to instantiate the
     // class from the name.
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
similarity index 76%
rename from itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
rename to itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
index 35053e7..603155b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
@@ -25,84 +25,67 @@ import java.util.List;
 
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
 import org.apache.hive.testutils.MiniZooKeeperCluster;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
-import org.junit.AfterClass;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Test;
-
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
 
 /**
  * TestZooKeeperTokenStore.
  */
-public abstract class ZooKeeperTokenStoreTestBase {
-
-  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
-  private static final String TRUST_STORE_NAME = "truststore.jks";
-  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
+public class TestZooKeeperTokenStore {
 
-  private static MiniZooKeeperCluster zkCluster = null;
-  private static int zkPort = -1;
-  private static ZooKeeperTokenStore ts;
-  private static boolean zkSslEnabled;
+  private MiniZooKeeperCluster zkCluster = null;
+  private CuratorFramework zkClient = null;
+  private int zkPort = -1;
+  private ZooKeeperTokenStore ts;
 
-  public static void setUpInternal(boolean sslEnabled) throws Exception{
+  @Before
+  public void setUp() throws Exception {
     File zkDataDir = new File(System.getProperty("test.tmp.dir"));
-    if (zkCluster != null) {
+    if (this.zkCluster != null) {
       throw new IOException("Cluster already running");
     }
-    zkCluster = new MiniZooKeeperCluster(sslEnabled);
-    zkPort = zkCluster.startup(zkDataDir);
-    zkSslEnabled = sslEnabled;
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception{
-    zkCluster.shutdown();
-    zkCluster = null;
+    this.zkCluster = new MiniZooKeeperCluster();
+    this.zkPort = this.zkCluster.startup(zkDataDir);
+    this.zkClient =
+        CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort)
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+    this.zkClient.start();
   }
 
   @After
-  public void closeTokenStore() throws Exception{
+  public void tearDown() throws Exception {
+    this.zkClient.close();
     if (ts != null) {
       ts.close();
     }
+    this.zkCluster.shutdown();
+    this.zkCluster = null;
   }
 
   private Configuration createConf(String zkPath) {
     Configuration conf = new Configuration();
     conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:"
-        + zkPort);
+        + this.zkPort);
     conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath);
-    if(zkSslEnabled) {
-      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
-          System.getProperty("test.data.files") :
-          (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
-      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION,
-          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
-      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD,
-          KEY_STORE_TRUST_STORE_PASSWORD);
-      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION,
-          dataFileDir + File.separator + TRUST_STORE_NAME);
-      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD,
-          KEY_STORE_TRUST_STORE_PASSWORD);
-      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, "true");
-
-    }
     return conf;
   }
 
@@ -115,7 +98,6 @@ public abstract class ZooKeeperTokenStoreTestBase {
     ts.setConf(conf);
     ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
 
-    CuratorFramework zkClient = ts.getSession();
 
     String metastore_zk_path = ZK_PATH + ServerMode.METASTORE;
     int keySeq = ts.addMasterKey("key1Data");
@@ -130,7 +112,6 @@ public abstract class ZooKeeperTokenStoreTestBase {
 
     ts.removeMasterKey(keySeq);
     assertEquals("expected number keys", 1, ts.getMasterKeys().length);
-    ts.removeMasterKey(keySeq2);
 
     // tokens
     DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(
@@ -208,8 +189,6 @@ public abstract class ZooKeeperTokenStoreTestBase {
     ts = new ZooKeeperTokenStore();
     ts.setConf(conf);
     ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
-
-    CuratorFramework zkClient = ts.getSession();
     List<ACL> acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE);
     assertEquals(2, acl.size());
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
deleted file mode 100644
index 084e097..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.security;
-
-import org.junit.BeforeClass;
-
-/**
- * TestZookeeperTokenStore with zookeeper SSL communication disabled.
- */
-public class TestZookeeperTokenStorePlain extends ZooKeeperTokenStoreTestBase {
-
-  public TestZookeeperTokenStorePlain(){
-    super();
-  }
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    setUpInternal(false);
-  }
-}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
deleted file mode 100644
index 4017990..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.security;
-
-import org.junit.BeforeClass;
-
-/**
- * TestZookeeperTokenStore with zookeeper SSL communication enabled.
- */
-public class TestZookeeperTokenStoreSSLEnabled extends ZooKeeperTokenStoreTestBase {
-
-  public TestZookeeperTokenStoreSSLEnabled(){
-    super();
-  }
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    setUpInternal(true);
-  }
-}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
index 596c3d6..cc32a7e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
@@ -111,10 +111,6 @@ public class TestRestrictedList {
     addToExpectedRestrictedMap("spark.home");
     addToExpectedRestrictedMap("hive.privilege.synchronizer.interval");
     addToExpectedRestrictedMap("hive.driver.parallel.compilation.global.limit");
-    addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.location");
-    addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.password");
-    addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.location");
-    addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.password");
   }
 
   @AfterClass
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
index 3322434..bd5e811 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
@@ -22,10 +22,11 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.nodes.PersistentNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
@@ -164,8 +165,9 @@ public class TestServiceDiscovery {
     // Publish configs for this instance as the data on the node
     znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confs);
     byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
-    PersistentNode znode = new PersistentNode(client, CreateMode.EPHEMERAL_SEQUENTIAL,
-        false, pathPrefix, znodeDataUTF8);
+    PersistentEphemeralNode znode =
+      new PersistentEphemeralNode(client,
+        PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
     znode.start();
     // We'll wait for 120s for node creation
     long znodeCreationTimeout = 120;
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
similarity index 95%
rename from itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
rename to itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
index 7302e09..de2e493 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
@@ -19,7 +19,6 @@
 package org.apache.hive.service.server;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -55,14 +54,14 @@ import org.apache.hive.service.cli.CLIServiceClient;
 import org.apache.hive.service.cli.OperationHandle;
 import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.SessionHandle;
-import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
  * Test restricted information schema with privilege synchronization
  */
-public abstract class InformationSchemaWithPrivilegeTestBase {
+public class TestInformationSchemaWithPrivilege {
 
   // Group mapping:
   // group_a: user1, user2
@@ -176,18 +175,14 @@ public abstract class InformationSchemaWithPrivilegeTestBase {
     }
   }
 
-  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
-  private static final String TRUST_STORE_NAME = "truststore.jks";
-  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
-
   private static MiniHS2 miniHS2 = null;
   private static MiniZooKeeperCluster zkCluster = null;
   private static Map<String, String> confOverlay;
 
-
-  public static void setupInternal(boolean zookeeperSSLEnabled) throws Exception {
+  @BeforeClass
+  public static void beforeTest() throws Exception {
     File zkDataDir = new File(System.getProperty("test.tmp.dir"));
-    zkCluster = new MiniZooKeeperCluster(zookeeperSSLEnabled);
+    zkCluster = new MiniZooKeeperCluster();
     int zkPort = zkCluster.startup(zkDataDir);
 
     miniHS2 = new MiniHS2(new HiveConf());
@@ -211,34 +206,9 @@ public abstract class InformationSchemaWithPrivilegeTestBase {
     confOverlay.put(ConfVars.HIVE_AUTHENTICATOR_MANAGER.varname, FakeGroupAuthenticator.class.getName());
     confOverlay.put(ConfVars.HIVE_AUTHORIZATION_ENABLED.varname, "true");
     confOverlay.put(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST.varname, ".*");
-
-    if(zookeeperSSLEnabled) {
-      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
-          System.getProperty("test.data.files") :
-          (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
-      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION.varname,
-          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
-      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
-          KEY_STORE_TRUST_STORE_PASSWORD);
-      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION.varname,
-          dataFileDir + File.separator + TRUST_STORE_NAME);
-      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
-          KEY_STORE_TRUST_STORE_PASSWORD);
-      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE.varname, "true");
-    }
     miniHS2.start(confOverlay);
   }
 
-  @AfterClass
-  public static void tearDown() throws IOException {
-    if (miniHS2 != null) {
-      miniHS2.stop();
-    }
-    if (zkCluster != null) {
-      zkCluster.shutdown();
-    }
-  }
-
   @Test
   public void test() throws Exception {
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
deleted file mode 100644
index ffa1843..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.service.server;
-
-import org.junit.BeforeClass;
-
-/**
- * Test restricted information schema with privilege synchronization with Zookeeper SSL communication disabled.
- */
-public class TestInformationSchemaWithPrivilegeZookeeperPlain extends InformationSchemaWithPrivilegeTestBase {
-
-  public TestInformationSchemaWithPrivilegeZookeeperPlain() {
-    super();
-  }
-
-  @BeforeClass
-  public static void setUp() throws Exception{
-    setupInternal(false);
-  }
-}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
deleted file mode 100644
index e12f494..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.service.server;
-
-import org.junit.BeforeClass;
-
-/**
- * Test restricted information schema with privilege synchronization with Zookeeper SSL communication enabled.
- */
-public class TestInformationSchemaWithPrivilegeZookeeperSSL extends InformationSchemaWithPrivilegeTestBase {
-
-  public TestInformationSchemaWithPrivilegeZookeeperSSL() {
-    super();
-  }
-
-  @BeforeClass
-  public static void setUp() throws Exception{
-    setupInternal(true);
-  }
-}
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 6cb6853..e23826e 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -116,11 +116,6 @@ public class Utils {
     public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
     public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA";
     public static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
-    public static final String ZOOKEEPER_SSL_ENABLE = "zooKeeperSSLEnable";
-    public static final String ZOOKEEPER_KEYSTORE_LOCATION = "zooKeeperKeystoreLocation";
-    public static final String ZOOKEEPER_KEYSTORE_PASSWORD= "zooKeeperKeystorePassword";
-    public static final String ZOOKEEPER_TRUSTSTORE_LOCATION  = "zooKeeperTruststoreLocation";
-    public static final String ZOOKEEPER_TRUSTSTORE_PASSWORD = "zooKeeperTruststorePassword";
     // Default namespace value on ZooKeeper.
     // This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
     static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
@@ -173,11 +168,6 @@ public class Utils {
     private boolean isEmbeddedMode = false;
     private String suppliedURLAuthority;
     private String zooKeeperEnsemble = null;
-    private boolean zooKeeperSslEnabled = false;
-    private String zookeeperKeyStoreLocation = "";
-    private String zookeeperKeyStorePassword = "";
-    private String zookeeperTrustStoreLocation = "";
-    private String zookeeperTrustStorePassword = "";
     private String currentHostZnodePath;
     private final List<String> rejectedHostZnodePaths = new ArrayList<String>();
 
@@ -195,12 +185,6 @@ public class Utils {
       this.isEmbeddedMode = params.isEmbeddedMode;
       this.suppliedURLAuthority = params.suppliedURLAuthority;
       this.zooKeeperEnsemble = params.zooKeeperEnsemble;
-      this.zooKeeperSslEnabled = params.zooKeeperSslEnabled;
-      this.zookeeperKeyStoreLocation = params.zookeeperKeyStoreLocation;
-      this.zookeeperKeyStorePassword = params.zookeeperKeyStorePassword;
-      this.zookeeperTrustStoreLocation = params.zookeeperTrustStoreLocation;
-      this.zookeeperTrustStorePassword = params.zookeeperTrustStorePassword;
-
       this.currentHostZnodePath = params.currentHostZnodePath;
       this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths);
     }
@@ -244,25 +228,6 @@ public class Utils {
     public String getZooKeeperEnsemble() {
       return zooKeeperEnsemble;
     }
-    public boolean isZooKeeperSslEnabled() {
-      return zooKeeperSslEnabled;
-    }
-
-    public String getZookeeperKeyStoreLocation() {
-      return zookeeperKeyStoreLocation;
-    }
-
-    public String getZookeeperKeyStorePassword() {
-      return zookeeperKeyStorePassword;
-    }
-
-    public String getZookeeperTrustStoreLocation() {
-      return zookeeperTrustStoreLocation;
-    }
-
-    public String getZookeeperTrustStorePassword() {
-      return zookeeperTrustStorePassword;
-    }
 
     public List<String> getRejectedHostZnodePaths() {
       return rejectedHostZnodePaths;
@@ -312,26 +277,6 @@ public class Utils {
       this.zooKeeperEnsemble = zooKeeperEnsemble;
     }
 
-    public void setZooKeeperSslEnabled(boolean zooKeeperSslEnabled) {
-      this.zooKeeperSslEnabled = zooKeeperSslEnabled;
-    }
-
-    public void setZookeeperKeyStoreLocation(String zookeeperKeyStoreLocation) {
-      this.zookeeperKeyStoreLocation = zookeeperKeyStoreLocation;
-    }
-
-    public void setZookeeperKeyStorePassword(String zookeeperKeyStorePassword) {
-      this.zookeeperKeyStorePassword = zookeeperKeyStorePassword;
-    }
-
-    public void setZookeeperTrustStoreLocation(String zookeeperTrustStoreLocation) {
-      this.zookeeperTrustStoreLocation = zookeeperTrustStoreLocation;
-    }
-
-    public void setZookeeperTrustStorePassword(String zookeeperTrustStorePassword) {
-      this.zookeeperTrustStorePassword = zookeeperTrustStorePassword;
-    }
-
     public void setCurrentHostZnodePath(String currentHostZnodePath) {
       this.currentHostZnodePath = currentHostZnodePath;
     }
@@ -540,7 +485,6 @@ public class Utils {
         uri = uri.replace(dummyAuthorityString, authorityStr);
         // Set ZooKeeper ensemble in connParams for later use
         connParams.setZooKeeperEnsemble(authorityStr);
-        ZooKeeperHiveClientHelper.setZkSSLParams(connParams);
       } else {
         URI jdbcBaseURI = URI.create(URI_HIVE_PREFIX + "//" + authorityStr);
         // Check to prevent unintentional use of embedded mode. A missing "/"
@@ -632,6 +576,7 @@ public class Utils {
    * host:port pairs.
    *
    * @param uri
+   * @param connParams
    * @return
    * @throws JdbcUriParseException
    */
diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
index 3d89fa2..759ba8a 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
@@ -32,7 +32,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.SSLZookeeperFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
 import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
@@ -86,40 +85,11 @@ class ZooKeeperHiveClientHelper {
       JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode));
   }
 
-  /**
-   * Parse and set up the SSL communication related Zookeeper params in connParams from sessionVars.
-   * @param connParams
-   */
-  public static void setZkSSLParams(JdbcConnectionParams connParams) {
-    Map<String, String> sessionConf = connParams.getSessionVars();
-    boolean sslEnabled = false;
-    if (sessionConf.containsKey(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE)) {
-      sslEnabled = Boolean.parseBoolean(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE));
-      connParams.setZooKeeperSslEnabled(sslEnabled);
-    }
-    if (sslEnabled) {
-      connParams.setZookeeperKeyStoreLocation(
-          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_LOCATION), ""));
-      connParams.setZookeeperKeyStorePassword(
-          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_PASSWORD), ""));
-      connParams.setZookeeperTrustStoreLocation(
-          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_LOCATION), ""));
-      connParams.setZookeeperTrustStorePassword(
-          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_PASSWORD), ""));
-    }
-  }
-
   private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception {
     String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
     CuratorFramework zooKeeperClient =
-        CuratorFrameworkFactory.builder()
-            .connectString(zooKeeperEnsemble)
-            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
-            .zookeeperFactory(
-            new SSLZookeeperFactory(connParams.isZooKeeperSslEnabled(), connParams.getZookeeperKeyStoreLocation(),
-                connParams.getZookeeperKeyStorePassword(), connParams.getZookeeperTrustStoreLocation(),
-                connParams.getZookeeperTrustStorePassword()))
-            .build();
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
     zooKeeperClient.start();
     return zooKeeperClient;
   }
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index 2b21baa..d28fd17 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -30,18 +30,20 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.nodes.PersistentNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapUtil;
@@ -53,7 +55,6 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMars
 import org.apache.hadoop.registry.client.types.ServiceRecord;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.InvalidACLException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
@@ -110,7 +111,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private final Map<String, Set<InstanceType>> nodeToInstanceCache;
 
   // The registration znode.
-  private PersistentNode znode;
+  private PersistentEphemeralNode znode;
   private String znodePath; // unique identity for this instance
 
   private PathChildrenCache instancesCache; // Created on demand.
@@ -211,23 +212,28 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   }
 
   private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
-    return ZooKeeperHiveHelper.builder()
-        .quorum(conf.get(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname))
-        .clientPort(conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
-            ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()))
-        .connectionTimeout(
-            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
-        .sessionTimeout(
-            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS))
-        .baseSleepTime(
-            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS))
-        .maxRetries(HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
-        .sslEnabled(HiveConf.getBoolVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
-        .keyStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
-        .keyStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
-        .trustStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
-        .trustStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD))
-        .build().getNewZookeeperClient(zooKeeperAclProvider, namespace);
+    String zkEnsemble = getQuorumServers(conf);
+    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int connectionTimeout = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+    int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+
+    LOG.info("Creating curator client with connectString: {} sessionTimeoutMs: {} connectionTimeoutMs: {}" +
+      " namespace: {} exponentialBackoff - sleepTime: {} maxRetries: {}", zkEnsemble, sessionTimeout,
+      connectionTimeout, namespace, baseSleepTime, maxRetries);
+    // Create a CuratorFramework instance to be used as the ZooKeeper client
+    // Use the zooKeeperAclProvider to create appropriate ACLs
+    return CuratorFrameworkFactory.builder()
+      .connectString(zkEnsemble)
+      .sessionTimeoutMs(sessionTimeout)
+      .connectionTimeoutMs(connectionTimeout)
+      .aclProvider(zooKeeperAclProvider)
+      .namespace(namespace)
+      .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+      .build();
   }
 
   private static List<ACL> createSecureAcls() {
@@ -277,9 +283,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
     // Create a znode under the rootNamespace parent for this instance of the server
     try {
-      // PersistentNode will make sure the ephemeral node created on server will be present
+      // PersistentEphemeralNode will make sure the ephemeral node created on server will be present
       // even under connection or session interruption (will automatically handle retries)
-      znode = new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false,
+      znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
           workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
     
       // start the creation of znodes
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
index e8eaac0..fa3a382 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
 
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.hive.conf.HiveConf;
 
 public class CuratorFrameworkSingleton {
@@ -47,8 +50,15 @@ public class CuratorFrameworkSingleton {
       } else {
         conf = hiveConf;
       }
+      int sessionTimeout =  (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+      int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+      int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+      String quorumServers = conf.getZKConfig().getQuorumServers();
 
-      sharedClient = conf.getZKConfig().getNewZookeeperClient();
+      sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers)
+          .sessionTimeoutMs(sessionTimeout)
+          .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+          .build();
       sharedClient.start();
     }
 
diff --git a/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java b/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
index dc11ae1..eec6282 100644
--- a/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
+++ b/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
@@ -34,9 +34,7 @@ import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.zookeeper.common.ClientX509Util;
-import org.apache.zookeeper.common.X509Util;
-import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.slf4j.Logger;
@@ -56,9 +54,6 @@ public class MiniZooKeeperCluster {
 
   private static final int TICK_TIME = 2000;
   private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
-  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
-  private static final String TRUST_STORE_NAME = "truststore.jks";
-  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
   private int connectionTimeout;
 
   private boolean started;
@@ -66,7 +61,7 @@ public class MiniZooKeeperCluster {
   /** The default port. If zero, we use a random port. */
   private int defaultClientPort = 0;
 
-  private List<ServerCnxnFactory> standaloneServerFactoryList;
+  private List<NIOServerCnxnFactory> standaloneServerFactoryList;
   private List<ZooKeeperServer> zooKeeperServers;
   private List<Integer> clientPortList;
 
@@ -75,20 +70,11 @@ public class MiniZooKeeperCluster {
 
   private Configuration configuration;
 
-  private boolean sslEnabled = false;
-
   public MiniZooKeeperCluster() {
     this(new Configuration());
   }
-  public MiniZooKeeperCluster(boolean sslEnabled) {
-    this(new Configuration(), sslEnabled);
-  }
-  public MiniZooKeeperCluster(Configuration configuration) {
-    this(configuration, false);
-  }
 
-  public MiniZooKeeperCluster(Configuration configuration, boolean sslEnabled) {
-    this.sslEnabled = sslEnabled;
+  public MiniZooKeeperCluster(Configuration configuration) {
     this.started = false;
     this.configuration = configuration;
     activeZKServerIndex = -1;
@@ -181,7 +167,7 @@ public class MiniZooKeeperCluster {
   }
 
   // / XXX: From o.a.zk.t.ClientBase
-  private static void setupTestEnv(boolean sslEnabled) {
+  private static void setupTestEnv() {
     // With ZooKeeper 3.5 we need to whitelist the 4 letter commands we use
     System.setProperty("zookeeper.4lw.commands.whitelist", "*");
 
@@ -190,9 +176,6 @@ public class MiniZooKeeperCluster {
     // resulting in test failure (client timeout on first session).
     // set env and directly in order to handle static init/gc issues
     System.setProperty("zookeeper.preAllocSize", "100");
-    if (sslEnabled) {
-      System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
-    }
     FileTxnLog.setPreallocSize(100 * 1024);
   }
 
@@ -217,7 +200,7 @@ public class MiniZooKeeperCluster {
       return -1;
     }
 
-    setupTestEnv(sslEnabled);
+    setupTestEnv();
     shutdown();
 
     int tentativePort = -1; // the seed port
@@ -246,10 +229,12 @@ public class MiniZooKeeperCluster {
       // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
       server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
       server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
-      ServerCnxnFactory standaloneServerFactory;
+      NIOServerCnxnFactory standaloneServerFactory;
       while (true) {
         try {
-          standaloneServerFactory = createServerCnxnFactory(currentClientPort);
+          standaloneServerFactory = new NIOServerCnxnFactory();
+          standaloneServerFactory.configure(new InetSocketAddress(currentClientPort),
+              configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
         } catch (BindException e) {
           LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
           // We're told to use some port but it's occupied, fail
@@ -267,7 +252,7 @@ public class MiniZooKeeperCluster {
       // Start up this ZK server
       standaloneServerFactory.startup(server);
       // Runs a 'stat' against the servers.
-      if (!sslEnabled && !waitForServerUp(currentClientPort, connectionTimeout)) {
+      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
         throw new IOException("Waiting for startup of standalone server");
       }
 
@@ -291,36 +276,6 @@ public class MiniZooKeeperCluster {
     return clientPort;
   }
 
-  private ServerCnxnFactory createServerCnxnFactory(int currentClientPort) throws IOException {
-    ServerCnxnFactory serverCnxnFactory = null;
-    if (sslEnabled) {
-      System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
-          "org.apache.zookeeper.server.NettyServerCnxnFactory");
-      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
-              System.getProperty("test.data.files") :
-              configuration.get("test.data.files").replace('\\', '/').replace("c:", "");
-      X509Util x509Util = new ClientX509Util();
-      System.setProperty(x509Util.getSslKeystoreLocationProperty(),
-          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
-      System.setProperty(x509Util.getSslKeystorePasswdProperty(),
-          KEY_STORE_TRUST_STORE_PASSWORD);
-      System.setProperty(x509Util.getSslTruststoreLocationProperty(),
-          dataFileDir + File.separator + TRUST_STORE_NAME);
-      System.setProperty(x509Util.getSslTruststorePasswdProperty(),
-          KEY_STORE_TRUST_STORE_PASSWORD);
-      serverCnxnFactory = ServerCnxnFactory.createFactory();
-      serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
-          configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS),
-          true);
-    } else {
-      serverCnxnFactory = ServerCnxnFactory.createFactory();
-      serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
-          configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
-    }
-    return serverCnxnFactory;
-  }
-
-
   private void createDir(File dir) throws IOException {
     try {
       if (!dir.exists()) {
@@ -337,7 +292,7 @@ public class MiniZooKeeperCluster {
   public void shutdown() throws IOException {
     // shut down all the zk servers
     for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
-      ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
+      NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
       int clientPort = clientPortList.get(i);
 
       standaloneServerFactory.shutdown();
@@ -350,7 +305,6 @@ public class MiniZooKeeperCluster {
     for (ZooKeeperServer zkServer : zooKeeperServers) {
       //explicitly close ZKDatabase since ZookeeperServer does not close them
       zkServer.getZKDatabase().close();
-      zkServer.shutdown(true);
     }
     zooKeeperServers.clear();
 
@@ -374,7 +328,7 @@ public class MiniZooKeeperCluster {
     }
 
     // Shutdown the current active one
-    ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
+    NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
     int clientPort = clientPortList.get(activeZKServerIndex);
 
     standaloneServerFactory.shutdown();
@@ -412,7 +366,7 @@ public class MiniZooKeeperCluster {
 
     int backupZKServerIndex = activeZKServerIndex + 1;
     // Shutdown the current active one
-    ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
+    NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
     int clientPort = clientPortList.get(backupZKServerIndex);
 
     standaloneServerFactory.shutdown();
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 181ea5d..fece82e 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -43,12 +43,14 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.JvmPauseMonitor;
 import org.apache.hadoop.hive.common.LogUtils;
@@ -1076,9 +1078,14 @@ public class HiveServer2 extends CompositeService {
    */
   static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
     HiveConf hiveConf = new HiveConf();
-    CuratorFramework zooKeeperClient = hiveConf.getZKConfig().getNewZookeeperClient();
-    zooKeeperClient.start();
+    String zooKeeperEnsemble = hiveConf.getZKConfig().getQuorumServers();
     String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+    int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+    CuratorFramework zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
+    zooKeeperClient.start();
     List<String> znodePaths =
         zooKeeperClient.getChildren().forPath(
             ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
deleted file mode 100644
index ee01731..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.utils.ZookeeperFactory;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.client.ZKClientConfig;
-import org.apache.zookeeper.common.ClientX509Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Factory to create Zookeeper clients with the zookeeper.client.secure enabled,
- * allowing SSL communication with the Zookeeper server.
- */
-public class SSLZookeeperFactory implements ZookeeperFactory {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SSLZookeeperFactory.class);
-
-  private boolean sslEnabled;
-  private String keyStoreLocation;
-  private String keyStorePassword;
-  private String trustStoreLocation;
-  private String trustStorePassword;
-
-  public SSLZookeeperFactory(boolean sslEnabled, String keyStoreLocation, String keyStorePassword,
-      String trustStoreLocation, String trustStorePassword) {
-
-    this.sslEnabled = sslEnabled;
-    this.keyStoreLocation = keyStoreLocation;
-    this.keyStorePassword = keyStorePassword;
-    this.trustStoreLocation = trustStoreLocation;
-    this.trustStorePassword = trustStorePassword;
-    if (sslEnabled) {
-      if (StringUtils.isEmpty(keyStoreLocation)) {
-        LOG.warn("Missing keystoreLocation parameter");
-      }
-      if (StringUtils.isEmpty(trustStoreLocation)) {
-        LOG.warn("Missing trustStoreLocation parameter");
-      }
-    }
-  }
-
-  @Override
-  public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
-      boolean canBeReadOnly) throws Exception {
-    if (!this.sslEnabled) {
-      return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
-    }
-    ZKClientConfig clientConfig = new ZKClientConfig();
-    clientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
-    clientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
-    ClientX509Util x509Util = new ClientX509Util();
-    clientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(), this.keyStoreLocation);
-    clientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(), this.keyStorePassword);
-    clientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(), this.trustStoreLocation);
-    clientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(), this.trustStorePassword);
-    return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, clientConfig);
-  }
-}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
index 71d8651..99f7c97 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
@@ -23,11 +23,10 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.List;
-
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.nodes.PersistentNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -56,164 +55,28 @@ public class ZooKeeperHiveHelper {
   public static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHiveHelper.class.getName());
   public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
 
-  /**
-   * ZooKeeperHiveHelperBuilder. A builder class to initialize ZooKeeperHiveHelper.
-   */
-  public static class ZooKeeperHiveHelperBuilder {
-    private String quorum = null;
-    private String clientPort = null;
-    private String serverRegistryNameSpace = null;
-    private int connectionTimeout;
-    private int sessionTimeout;
-    private int baseSleepTime;
-    private int maxRetries;
-    private boolean sslEnabled = false;
-    private String keyStoreLocation = null;
-    private String keyStorePassword = null;
-    private String trustStoreLocation = null;
-    private String trustStorePassword = null;
-
-    public ZooKeeperHiveHelper build() {
-      return new ZooKeeperHiveHelper(this);
-    }
-
-    public ZooKeeperHiveHelperBuilder quorum(String quorum) {
-      this.quorum = quorum;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder clientPort(String clientPort) {
-      this.clientPort = clientPort;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder serverRegistryNameSpace(String serverRegistryNameSpace) {
-      this.serverRegistryNameSpace = serverRegistryNameSpace;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder connectionTimeout(int connectionTimeout) {
-      this.connectionTimeout = connectionTimeout;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder sessionTimeout(int sessionTimeout) {
-      this.sessionTimeout = sessionTimeout;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder baseSleepTime(int baseSleepTime) {
-      this.baseSleepTime = baseSleepTime;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder maxRetries(int maxRetries) {
-      this.maxRetries = maxRetries;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder sslEnabled(boolean sslEnabled) {
-      this.sslEnabled = sslEnabled;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder keyStoreLocation(String keyStoreLocation) {
-      this.keyStoreLocation = keyStoreLocation;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder keyStorePassword(String keyStorePassword) {
-      this.keyStorePassword = keyStorePassword;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder trustStoreLocation(String trustStoreLocation) {
-      this.trustStoreLocation = trustStoreLocation;
-      return this;
-    }
-
-    public ZooKeeperHiveHelperBuilder trustStorePassword(String trustStorePassword) {
-      this.trustStorePassword = trustStorePassword;
-      return this;
-    }
-
-    public String getQuorum() {
-      return quorum;
-    }
-
-    public String getClientPort() {
-      return clientPort;
-    }
-
-    public String getServerRegistryNameSpace() {
-      return serverRegistryNameSpace;
-    }
-
-    public int getConnectionTimeout() {
-      return connectionTimeout;
-    }
-
-    public int getSessionTimeout() {
-      return sessionTimeout;
-    }
-
-    public int getBaseSleepTime() {
-      return baseSleepTime;
-    }
-
-    public int getMaxRetries() {
-      return maxRetries;
-    }
-
-    public boolean isSslEnabled() {
-      return sslEnabled;
-    }
-
-    public String getKeyStoreLocation() {
-      return keyStoreLocation;
-    }
-
-    public String getKeyStorePassword() {
-      return keyStorePassword;
-    }
-
-    public String getTrustStoreLocation() {
-      return trustStoreLocation;
-    }
-
-    public String getTrustStorePassword() {
-      return trustStorePassword;
-    }
-  }
-
-  public static ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder builder() {
-    return new ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder();
-  }
-
-  private String quorum;
-  private String rootNamespace;
-  private int connectionTimeout;
+  private String quorum = null;
+  private String clientPort = null;
+  private String rootNamespace = null;
+  private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
   private int sessionTimeout;
   private int baseSleepTime;
   private int maxRetries;
-  private boolean sslEnabled;
 
-  private SSLZookeeperFactory sslZookeeperFactory;
   private CuratorFramework zooKeeperClient;
-  private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
-  private PersistentNode znode;
-
+  private PersistentEphemeralNode znode;
 
-  public ZooKeeperHiveHelper(ZooKeeperHiveHelperBuilder builder) {
+  public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespace,
+                             int sessionTimeout, int baseSleepTime, int maxRetries) {
     // Get the ensemble server addresses in the format host1:port1, host2:port2, ... . Append
     // the configured port to hostname if the hostname doesn't contain a port.
-    String[] hosts = builder.getQuorum().split(",");
+    String[] hosts = quorum.split(",");
     StringBuilder quorumServers = new StringBuilder();
     for (int i = 0; i < hosts.length; i++) {
       quorumServers.append(hosts[i].trim());
       if (!hosts[i].contains(":")) {
         quorumServers.append(":");
-        quorumServers.append(builder.getClientPort());
+        quorumServers.append(clientPort);
       }
 
       if (i != hosts.length - 1) {
@@ -222,19 +85,11 @@ public class ZooKeeperHiveHelper {
     }
 
     this.quorum = quorumServers.toString();
-    this.rootNamespace = builder.getServerRegistryNameSpace();
-    this.connectionTimeout = builder.getConnectionTimeout();
-    this.sessionTimeout = builder.getSessionTimeout();
-    this.baseSleepTime = builder.getBaseSleepTime();
-    this.maxRetries = builder.getMaxRetries();
-    this.sslEnabled = builder.isSslEnabled();
-    this.sslZookeeperFactory =
-        new SSLZookeeperFactory(sslEnabled,
-            builder.getKeyStoreLocation(),
-            builder.getKeyStorePassword(),
-            builder.getTrustStoreLocation(),
-            builder.getTrustStorePassword());
-
+    this.clientPort = clientPort;
+    this.rootNamespace = rootNamespace;
+    this.sessionTimeout = sessionTimeout;
+    this.baseSleepTime = baseSleepTime;
+    this.maxRetries = maxRetries;
   }
 
   /**
@@ -263,7 +118,8 @@ public class ZooKeeperHiveHelper {
                       + ZOOKEEPER_PATH_SEPARATOR + znodePathPrefix;
       byte[] znodeDataUTF8 = znodeData.getBytes(StandardCharsets.UTF_8);
       znode =
-              new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false, pathPrefix, znodeDataUTF8);
+              new PersistentEphemeralNode(zooKeeperClient,
+                      PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
       znode.start();
       // We'll wait for 120s for node creation
       long znodeCreationTimeout = 120;
@@ -291,7 +147,17 @@ public class ZooKeeperHiveHelper {
 
   public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
                                                boolean addParentNode) throws Exception {
-    CuratorFramework zkClient = getNewZookeeperClient(zooKeeperAclProvider);
+    String zooKeeperEnsemble = getQuorumServers();
+    // Create a CuratorFramework instance to be used as the ZooKeeper client.
+    // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
+    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+            .connectString(zooKeeperEnsemble)
+            .sessionTimeoutMs(sessionTimeout)
+            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
+    if (zooKeeperAclProvider != null) {
+      builder = builder.aclProvider(zooKeeperAclProvider);
+    }
+    CuratorFramework zkClient = builder.build();
     zkClient.start();
 
     // Create the parent znodes recursively; ignore if the parent already exists.
@@ -311,39 +177,6 @@ public class ZooKeeperHiveHelper {
     }
     return zkClient;
   }
-  public CuratorFramework getNewZookeeperClient() {
-    return getNewZookeeperClient(null, null);
-  }
-  public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider) {
-    return getNewZookeeperClient(zooKeeperAclProvider, null);
-  }
-
-  public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider, String nameSpace) {
-    LOG.info("Creating curator client with connectString: {} namespace: {} sessionTimeoutMs: {}" +
-            " connectionTimeoutMs: {} exponentialBackoff - sleepTime: {} maxRetries: {} sslEnabled: {}",
-        quorum, nameSpace,  sessionTimeout,
-        connectionTimeout, baseSleepTime, maxRetries, sslEnabled);
-    // Create a CuratorFramework instance to be used as the ZooKeeper client.
-    // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
-    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-            .connectString(quorum)
-            .namespace(nameSpace)
-            .zookeeperFactory(this.sslZookeeperFactory);
-    if (connectionTimeout > 0) {
-      builder = builder.connectionTimeoutMs(connectionTimeout);
-    }
-    if (sessionTimeout > 0) {
-      builder = builder.sessionTimeoutMs(sessionTimeout);
-    }
-    if (maxRetries > 0) {
-      builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
-    }
-    if (zooKeeperAclProvider != null) {
-      builder = builder.aclProvider(zooKeeperAclProvider);
-    }
-
-    return builder.build();
-  }
 
   public void removeServerInstanceFromZooKeeper() throws Exception {
     setDeregisteredWithZooKeeper(true);
@@ -352,9 +185,7 @@ public class ZooKeeperHiveHelper {
       znode.close();
       znode = null;
     }
-    if (zooKeeperClient != null) {
-      zooKeeperClient.close();
-    }
+    zooKeeperClient.close();
     LOG.info("Server instance removed from ZooKeeper.");
   }
 
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index d6a6c96..fc6a2fd 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -258,11 +258,7 @@ public class MetastoreConf {
       ConfVars.SSL_TRUSTSTORE_PASSWORD.varname,
       ConfVars.SSL_TRUSTSTORE_PASSWORD.hiveName,
       ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.varname,
-      ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName,
-      ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
-      ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.hiveName,
-      ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
-      ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.hiveName
+      ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName
   );
 
   public static ConfVars getMetaConf(String name) {
@@ -1076,56 +1072,32 @@ public class MetastoreConf {
             "If ZooKeeper is configured for Kerberos authentication. This could be useful when cluster\n" +
             "is kerberized, but Zookeeper is not."),
     THRIFT_ZOOKEEPER_CLIENT_PORT("metastore.zookeeper.client.port",
-            "hive.zookeeper.client.port", "2181",
+            "hive.metastore.zookeeper.client.port", "2181",
             "The port of ZooKeeper servers to talk to.\n" +
                     "If the list of Zookeeper servers specified in hive.metastore.thrift.uris" +
                     " does not contain port numbers, this value is used."),
     THRIFT_ZOOKEEPER_SESSION_TIMEOUT("metastore.zookeeper.session.timeout",
-            "hive.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
+            "hive.metastore.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
             new TimeValidator(TimeUnit.MILLISECONDS),
             "ZooKeeper client's session timeout (in milliseconds). The client is disconnected\n" +
                     "if a heartbeat is not sent in the timeout."),
     THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT("metastore.zookeeper.connection.timeout",
-            "hive.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
+            "hive.metastore.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
             new TimeValidator(TimeUnit.SECONDS),
             "ZooKeeper client's connection timeout in seconds. " +
                     "Connection timeout * hive.metastore.zookeeper.connection.max.retries\n" +
                     "with exponential backoff is when curator client deems connection is lost to zookeeper."),
     THRIFT_ZOOKEEPER_NAMESPACE("metastore.zookeeper.namespace",
-            "hive.zookeeper.namespace", "hive_metastore",
+            "hive.metastore.zookeeper.namespace", "hive_metastore",
             "The parent node under which all ZooKeeper nodes for metastores are created."),
     THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES("metastore.zookeeper.connection.max.retries",
-            "hive.zookeeper.connection.max.retries", 3,
+            "hive.metastore.zookeeper.connection.max.retries", 3,
             "Max number of times to retry when connecting to the ZooKeeper server."),
     THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME("metastore.zookeeper.connection.basesleeptime",
-            "hive.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
+            "hive.metastore.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
             new TimeValidator(TimeUnit.MILLISECONDS),
             "Initial amount of time (in milliseconds) to wait between retries\n" +
                     "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
-    THRIFT_ZOOKEEPER_SSL_ENABLE("metastore.zookeeper.ssl.client.enable",
-        "hive.zookeeper.ssl.client.enable", false,
-        "Set client to use TLS when connecting to ZooKeeper.  An explicit value overrides any value set via the " +
-            "zookeeper.client.secure system property (note the different name).  Defaults to false if neither is set."),
-    THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION("metastore.zookeeper.ssl.keystore.location",
-        "hive.zookeeper.ssl.keystore.location", "",
-        "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
-            "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
-            "system property (note the camelCase)."),
-    THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("metastore.zookeeper.ssl.keystore.password",
-        "hive.zookeeper.ssl.keystore.password", "",
-        "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
-            "Overrides any explicit value set via the zookeeper.ssl.keyStore.password" +
-            "system property (note the camelCase)."),
-    THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("metastore.zookeeper.ssl.truststore.location",
-        "hive.zookeeper.ssl.truststore.location", "",
-        "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
-            "Overrides any explicit value set via the zookeeper.ssl.trustStore.location " +
-            "system property (note the camelCase)."),
-    THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("metastore.zookeeper.ssl.truststore.password",
-        "hive.zookeeper.ssl.truststore.password", "",
-        "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
-            "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
-            "system property (note the camelCase)."),
     THRIFT_URI_SELECTION("metastore.thrift.uri.selection", "hive.metastore.uri.selection", "RANDOM",
         new StringSetValidator("RANDOM", "SEQUENTIAL"),
         "Determines the selection mechanism used by metastore client to connect to remote " +
@@ -2046,22 +2018,14 @@ public class MetastoreConf {
   }
 
   public static ZooKeeperHiveHelper getZKConfig(Configuration conf) {
-    return ZooKeeperHiveHelper.builder()
-        .quorum(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS))
-        .clientPort(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT))
-        .serverRegistryNameSpace(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE))
-        .connectionTimeout((int) getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT,
-            TimeUnit.MILLISECONDS))
-        .sessionTimeout((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
-            TimeUnit.MILLISECONDS))
-        .baseSleepTime((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
-            TimeUnit.MILLISECONDS))
-        .maxRetries(MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES))
-        .sslEnabled(MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_ENABLE))
-        .keyStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
-        .keyStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
-        .trustStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
-        .trustStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
+    return new ZooKeeperHiveHelper(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS),
+            MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT),
+            MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE),
+            (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
+                    TimeUnit.MILLISECONDS),
+            (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+                    TimeUnit.MILLISECONDS),
+            MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES));
   }
 
   /**
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
index 239bff6..b35dc7c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
@@ -46,16 +46,6 @@ public class MetastoreDelegationTokenManager {
   public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
       "hive.cluster.delegation.token.store.zookeeper.acl";
   public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hivedelegation";
-  public static final String DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE =
-      "hive.cluster.delegation.token.store.zookeeper.ssl.client.enable";
-  public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION =
-      "hive.cluster.delegation.token.store.zookeeper.keystore.location";
-  public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD =
-      "hive.cluster.delegation.token.store.zookeeper.keystore.password";
-  public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION =
-      "hive.cluster.delegation.token.store.zookeeper.truststore.location";
-  public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD =
-      "hive.cluster.delegation.token.store.zookeeper.truststore.password";
 
   public MetastoreDelegationTokenManager() {
   }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
index dd2af7e..af52fcc 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
@@ -29,8 +29,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -57,22 +57,12 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
   protected static final String ZK_SEQ_FORMAT = "%010d";
   private static final String NODE_KEYS = "/keys";
   private static final String NODE_TOKENS = "/tokens";
-  private static final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
-      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
 
   private String rootNode = "";
   private volatile CuratorFramework zkSession;
   private String zkConnectString;
   private int connectTimeoutMillis;
-  private boolean sslEnabled;
-  private String keyStoreLocation;
-  private String keyStorePassword;
-  private String trustStoreLocation;
-  private String trustStorePassword;
-
   private List<ACL> newNodeAcl;
-  private Configuration conf;
-  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
 
   /**
    * ACLProvider permissions will be used in case parent dirs need to be created
@@ -120,7 +110,12 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
     }
   }
 
+  private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
+      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
+
+  private Configuration conf;
 
+  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
 
   /**
    * Default constructor for dynamic instantiation w/ Configurable
@@ -129,22 +124,14 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
   protected ZooKeeperTokenStore() {
   }
 
-  public CuratorFramework getSession() {
+  private CuratorFramework getSession() {
     if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
       synchronized (this) {
         if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
-          ZooKeeperHiveHelper zkHelper = ZooKeeperHiveHelper.builder()
-              .quorum(zkConnectString)
-              .connectionTimeout(connectTimeoutMillis)
-              .maxRetries(3)
-              .baseSleepTime(1000)
-              .sslEnabled(sslEnabled)
-              .keyStoreLocation(keyStoreLocation)
-              .keyStorePassword(keyStorePassword)
-              .trustStoreLocation(trustStoreLocation)
-              .trustStorePassword(trustStorePassword)
-              .build();
-          zkSession = zkHelper.getNewZookeeperClient(aclDefaultProvider);
+          zkSession =
+              CuratorFrameworkFactory.builder().connectString(zkConnectString)
+                  .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
+                  .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
           zkSession.start();
         }
       }
@@ -491,14 +478,10 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
             + WHEN_ZK_DSTORE_MSG);
       }
     }
-    connectTimeoutMillis = conf.getInt(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
-        CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
-
-    sslEnabled = conf.getBoolean(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, false);
-    keyStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION, "");
-    keyStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD, "");
-    trustStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION, "");
-    trustStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD, "");
+    connectTimeoutMillis =
+        conf.getInt(
+            MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+            CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
 
     String aclStr = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null);
     this.newNodeAcl = StringUtils.isNotBlank(aclStr)? parseACLs(aclStr) : getDefaultAcl(conf);