You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/03/26 08:25:06 UTC

[hive] branch master updated: HIVE-23045: Zookeeper SSL/TLS support (Peter Varga via Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new cb4427e  HIVE-23045: Zookeeper SSL/TLS support (Peter Varga via Denys Kuzmenko)
cb4427e is described below

commit cb4427e1eaaf27029a397acadc62b8dddcb0437e
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Thu Mar 26 09:22:53 2020 +0100

    HIVE-23045: Zookeeper SSL/TLS support (Peter Varga via Denys Kuzmenko)
---
 HIVE-23045.10.patch                                | 1742 ++++++++++++++++++++
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   69 +-
 .../hcatalog/templeton/tool/ZooKeeperStorage.java  |   40 +-
 .../security/TestZookeeperTokenStorePlain.java     |   35 +
 .../TestZookeeperTokenStoreSSLEnabled.java         |   35 +
 ...Store.java => ZooKeeperTokenStoreTestBase.java} |   71 +-
 .../org/apache/hive/jdbc/TestRestrictedList.java   |    4 +
 .../org/apache/hive/jdbc/TestServiceDiscovery.java |    8 +-
 ...=> InformationSchemaWithPrivilegeTestBase.java} |   40 +-
 ...formationSchemaWithPrivilegeZookeeperPlain.java |   35 +
 ...InformationSchemaWithPrivilegeZookeeperSSL.java |   35 +
 jdbc/src/java/org/apache/hive/jdbc/Utils.java      |   57 +-
 .../hive/jdbc/ZooKeeperHiveClientHelper.java       |   34 +-
 .../hadoop/hive/registry/impl/ZkRegistryBase.java  |   52 +-
 .../zookeeper/CuratorFrameworkSingleton.java       |   12 +-
 .../hive/testutils/MiniZooKeeperCluster.java       |   72 +-
 .../apache/hive/service/server/HiveServer2.java    |   11 +-
 .../hadoop/hive/common/SSLZookeeperFactory.java    |   78 +
 .../hadoop/hive/common/ZooKeeperHiveHelper.java    |  227 ++-
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   66 +-
 .../security/MetastoreDelegationTokenManager.java  |   10 +
 .../metastore/security/ZooKeeperTokenStore.java    |   47 +-
 22 files changed, 2585 insertions(+), 195 deletions(-)

diff --git a/HIVE-23045.10.patch b/HIVE-23045.10.patch
new file mode 100644
index 0000000..ce2fd49
--- /dev/null
+++ b/HIVE-23045.10.patch
@@ -0,0 +1,1742 @@
+diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+index d50912b4e2..34df01e60e 100644
+--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
++++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+@@ -2672,6 +2672,25 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
+         new TimeValidator(TimeUnit.MILLISECONDS),
+         "Initial amount of time (in milliseconds) to wait between retries\n" +
+         "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
++    HIVE_ZOOKEEPER_SSL_ENABLE("hive.zookeeper.ssl.client.enable", false,
++        "Set client to use TLS when connecting to ZooKeeper.  An explicit value overrides any value set via the " +
++            "zookeeper.client.secure system property (note the different name).  Defaults to false if neither is set."),
++    HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION("hive.zookeeper.ssl.keystore.location", "",
++        "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
++            "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
++            "system property (note the camelCase)."),
++    HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("hive.zookeeper.ssl.keystore.password", "",
++        "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
++            "Overrides any explicit value set via the zookeeper.ssl.keyStore.password " +
++             "system property (note the camelCase)."),
++    HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("hive.zookeeper.ssl.truststore.location", "",
++        "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
++            "Overrides any explicit value set via the zookeeper.ssl.trustStore.location" +
++            "system property (note the camelCase)."),
++    HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("hive.zookeeper.ssl.truststore.password", "",
++        "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
++            "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
++             "system property (note the camelCase)."),
+ 
+     // Transactions
+     HIVE_TXN_MANAGER("hive.txn.manager",
+@@ -4795,14 +4814,18 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
+             "hive.spark.client.rpc.server.address," +
+             "hive.spark.client.rpc.server.port," +
+             "hive.spark.client.rpc.sasl.mechanisms," +
+-            "bonecp.,"+
+-            "hive.druid.broker.address.default,"+
+-            "hive.druid.coordinator.address.default,"+
+-            "hikaricp.,"+
+-            "hadoop.bin.path,"+
+-            "yarn.bin.path,"+
+-            "spark.home,"+
+-            "hive.driver.parallel.compilation.global.limit",
++            "bonecp.," +
++            "hive.druid.broker.address.default," +
++            "hive.druid.coordinator.address.default," +
++            "hikaricp.," +
++            "hadoop.bin.path," +
++            "yarn.bin.path," +
++            "spark.home," +
++            "hive.driver.parallel.compilation.global.limit," +
++            "hive.zookeeper.ssl.keystore.location," +
++            "hive.zookeeper.ssl.keystore.password," +
++            "hive.zookeeper.ssl.truststore.location," +
++            "hive.zookeeper.ssl.truststore.password",
+         "Comma separated list of configuration options which are immutable at runtime"),
+     HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
+         METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname
+@@ -4817,7 +4840,11 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
+         + ",fs.s3a.proxy.password"
+         + ",dfs.adls.oauth2.credential"
+         + ",fs.adl.oauth2.credential"
+-        + ",fs.azure.account.oauth2.client.secret",
++        + ",fs.azure.account.oauth2.client.secret"
++        + ",hive.zookeeper.ssl.keystore.location"
++        + ",hive.zookeeper.ssl.keystore.password"
++        + ",hive.zookeeper.ssl.truststore.location"
++        + ",hive.zookeeper.ssl.truststore.password",
+         "Comma separated list of configuration options which should not be read by normal user like passwords"),
+     HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
+         "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
+@@ -5619,14 +5646,22 @@ public void logVars(PrintStream ps) {
+    * given HiveConf.
+    */
+   public ZooKeeperHiveHelper getZKConfig() {
+-    return new ZooKeeperHiveHelper(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM),
+-            getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT),
+-            getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE),
+-            (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+-                    TimeUnit.MILLISECONDS),
+-            (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+-                    TimeUnit.MILLISECONDS),
+-            getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES));
++    return ZooKeeperHiveHelper.builder()
++      .quorum(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM))
++      .clientPort(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT))
++      .serverRegistryNameSpace(getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE))
++      .connectionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
++          TimeUnit.MILLISECONDS))
++      .sessionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
++          TimeUnit.MILLISECONDS))
++      .baseSleepTime((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
++          TimeUnit.MILLISECONDS))
++      .maxRetries(getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
++      .sslEnabled(getBoolVar(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
++      .keyStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
++      .keyStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
++      .trustStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
++      .trustStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
+   }
+ 
+   public HiveConf() {
+diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
+index 1fc8d36394..02a8926ed5 100644
+--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
++++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
+@@ -22,11 +22,11 @@
+ import java.util.ArrayList;
+ import java.util.List;
+ 
++import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+@@ -50,8 +50,12 @@
+   public String overhead_path = null;
+ 
+   public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
+-  public static final String ZK_SESSION_TIMEOUT
+-    = "templeton.zookeeper.session-timeout";
++  public static final String ZK_SESSION_TIMEOUT = "templeton.zookeeper.session-timeout";
++  public static final String ZK_SSL_ENABLE = "templeton.zookeeper.ssl.client.enable";
++  public static final String ZK_KEYSTORE_LOCATION = "templeton.zookeeper.keystore.location";
++  public static final String ZK_KEYSTORE_PASSWORD = "templeton.zookeeper.keystore.password";
++  public static final String ZK_TRUSTSTORE_LOCATION = "templeton.zookeeper.truststore.location";
++  public static final String ZK_TRUSTSTORE_PASSWORD = "templeton.zookeeper.truststore.password";
+ 
+   public static final String ENCODING = "UTF-8";
+ 
+@@ -59,27 +63,25 @@
+ 
+   private CuratorFramework zk;
+ 
+-  /**
+-   * Open a ZooKeeper connection for the JobState.
+-   */
+-  public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
+-    throws IOException {
+-    //do we need to add a connection status listener?  What will that do?
+-    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+-    CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
+-      CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
+-    zk.start();
+-    return zk;
+-  }
+ 
+   /**
+    * Open a ZooKeeper connection for the JobState.
+    */
+   public static CuratorFramework zkOpen(Configuration conf) throws IOException {
+-    /*the silly looking call to Builder below is to get the default value of session timeout
+-    from Curator which itself exposes it as system property*/
+-    return zkOpen(conf.get(ZK_HOSTS),
+-      conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
++    ZooKeeperHiveHelper xkHelper = ZooKeeperHiveHelper.builder()
++        .quorum(conf.get(ZK_HOSTS))
++        .sessionTimeout(conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()))
++        .baseSleepTime(1000)
++        .maxRetries(3)
++        .sslEnabled(conf.getBoolean(ZK_SSL_ENABLE, false))
++        .keyStoreLocation(conf.get(ZK_KEYSTORE_LOCATION, ""))
++        .keyStorePassword(conf.get(ZK_KEYSTORE_PASSWORD, ""))
++        .trustStoreLocation(conf.get(ZK_TRUSTSTORE_LOCATION, ""))
++        .trustStorePassword(conf.get(ZK_TRUSTSTORE_PASSWORD, ""))
++        .build();
++    CuratorFramework zk = xkHelper.getNewZookeeperClient();
++    zk.start();
++    return zk;
+   }
+ 
+   public ZooKeeperStorage() {
+diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
+new file mode 100644
+index 0000000000..084e097be9
+--- /dev/null
++++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.metastore.security;
++
++import org.junit.BeforeClass;
++
++/**
++ * TestZookeeperTokenStore with zookeeper SSL communication disabled.
++ */
++public class TestZookeeperTokenStorePlain extends ZooKeeperTokenStoreTestBase {
++
++  public TestZookeeperTokenStorePlain(){
++    super();
++  }
++
++  @BeforeClass
++  public static void setUp() throws Exception {
++    setUpInternal(false);
++  }
++}
+diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
+new file mode 100644
+index 0000000000..40179901ce
+--- /dev/null
++++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.metastore.security;
++
++import org.junit.BeforeClass;
++
++/**
++ * TestZookeeperTokenStore with zookeeper SSL communication enabled.
++ */
++public class TestZookeeperTokenStoreSSLEnabled extends ZooKeeperTokenStoreTestBase {
++
++  public TestZookeeperTokenStoreSSLEnabled(){
++    super();
++  }
++
++  @BeforeClass
++  public static void setUp() throws Exception {
++    setUpInternal(true);
++  }
++}
+diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
+similarity index 76%
+rename from itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
+rename to itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
+index 603155bf8f..35053e70b0 100644
+--- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
++++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
+@@ -25,67 +25,84 @@
+ 
+ 
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.CuratorFrameworkFactory;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
+-import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+ import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+ import org.apache.hive.testutils.MiniZooKeeperCluster;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.data.ACL;
++import org.junit.AfterClass;
++import org.junit.After;
+ import org.junit.Assert;
++import org.junit.Test;
++
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotSame;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.assertNull;
+ import static org.junit.Assert.fail;
+-import org.junit.Before;
+-import org.junit.After;
+-import org.junit.Test;
+ 
+ /**
+  * TestZooKeeperTokenStore.
+  */
+-public class TestZooKeeperTokenStore {
++public abstract class ZooKeeperTokenStoreTestBase {
++
++  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
++  private static final String TRUST_STORE_NAME = "truststore.jks";
++  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
+ 
+-  private MiniZooKeeperCluster zkCluster = null;
+-  private CuratorFramework zkClient = null;
+-  private int zkPort = -1;
+-  private ZooKeeperTokenStore ts;
++  private static MiniZooKeeperCluster zkCluster = null;
++  private static int zkPort = -1;
++  private static ZooKeeperTokenStore ts;
++  private static boolean zkSslEnabled;
+ 
+-  @Before
+-  public void setUp() throws Exception {
++  public static void setUpInternal(boolean sslEnabled) throws Exception{
+     File zkDataDir = new File(System.getProperty("test.tmp.dir"));
+-    if (this.zkCluster != null) {
++    if (zkCluster != null) {
+       throw new IOException("Cluster already running");
+     }
+-    this.zkCluster = new MiniZooKeeperCluster();
+-    this.zkPort = this.zkCluster.startup(zkDataDir);
+-    this.zkClient =
+-        CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort)
+-            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+-    this.zkClient.start();
++    zkCluster = new MiniZooKeeperCluster(sslEnabled);
++    zkPort = zkCluster.startup(zkDataDir);
++    zkSslEnabled = sslEnabled;
++  }
++
++  @AfterClass
++  public static void tearDown() throws Exception{
++    zkCluster.shutdown();
++    zkCluster = null;
+   }
+ 
+   @After
+-  public void tearDown() throws Exception {
+-    this.zkClient.close();
++  public void closeTokenStore() throws Exception{
+     if (ts != null) {
+       ts.close();
+     }
+-    this.zkCluster.shutdown();
+-    this.zkCluster = null;
+   }
+ 
+   private Configuration createConf(String zkPath) {
+     Configuration conf = new Configuration();
+     conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:"
+-        + this.zkPort);
++        + zkPort);
+     conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath);
++    if(zkSslEnabled) {
++      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
++          System.getProperty("test.data.files") :
++          (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
++      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION,
++          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
++      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD,
++          KEY_STORE_TRUST_STORE_PASSWORD);
++      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION,
++          dataFileDir + File.separator + TRUST_STORE_NAME);
++      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD,
++          KEY_STORE_TRUST_STORE_PASSWORD);
++      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, "true");
++
++    }
+     return conf;
+   }
+ 
+@@ -98,6 +115,7 @@ public void testTokenStorage() throws Exception {
+     ts.setConf(conf);
+     ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
+ 
++    CuratorFramework zkClient = ts.getSession();
+ 
+     String metastore_zk_path = ZK_PATH + ServerMode.METASTORE;
+     int keySeq = ts.addMasterKey("key1Data");
+@@ -112,6 +130,7 @@ public void testTokenStorage() throws Exception {
+ 
+     ts.removeMasterKey(keySeq);
+     assertEquals("expected number keys", 1, ts.getMasterKeys().length);
++    ts.removeMasterKey(keySeq2);
+ 
+     // tokens
+     DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(
+@@ -189,6 +208,8 @@ public void testAclPositive() throws Exception {
+     ts = new ZooKeeperTokenStore();
+     ts.setConf(conf);
+     ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
++
++    CuratorFramework zkClient = ts.getSession();
+     List<ACL> acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE);
+     assertEquals(2, acl.size());
+   }
+diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+index cc32a7e2b8..596c3d6fc1 100644
+--- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
++++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+@@ -111,6 +111,10 @@ public static void startServices() throws Exception {
+     addToExpectedRestrictedMap("spark.home");
+     addToExpectedRestrictedMap("hive.privilege.synchronizer.interval");
+     addToExpectedRestrictedMap("hive.driver.parallel.compilation.global.limit");
++    addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.location");
++    addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.password");
++    addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.location");
++    addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.password");
+   }
+ 
+   @AfterClass
+diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
+index bd5e811743..3322434cb6 100644
+--- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
++++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
+@@ -22,11 +22,10 @@
+ import com.google.common.collect.Collections2;
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
++import org.apache.curator.framework.recipes.nodes.PersistentNode;
+ import org.apache.curator.retry.RetryOneTime;
+ import org.apache.curator.test.TestingServer;
+ import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+-import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.junit.After;
+@@ -165,9 +164,8 @@ private void publishConfsToZk(Map<String, String> confs, String uri) throws Exce
+     // Publish configs for this instance as the data on the node
+     znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confs);
+     byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
+-    PersistentEphemeralNode znode =
+-      new PersistentEphemeralNode(client,
+-        PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
++    PersistentNode znode = new PersistentNode(client, CreateMode.EPHEMERAL_SEQUENTIAL,
++        false, pathPrefix, znodeDataUTF8);
+     znode.start();
+     // We'll wait for 120s for node creation
+     long znodeCreationTimeout = 120;
+diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
+similarity index 95%
+rename from itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
+rename to itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
+index de2e4937a8..7302e0993a 100644
+--- itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
++++ itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
+@@ -19,6 +19,7 @@
+ package org.apache.hive.service.server;
+ 
+ import java.io.File;
++import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.Iterator;
+@@ -54,14 +55,14 @@
+ import org.apache.hive.service.cli.OperationHandle;
+ import org.apache.hive.service.cli.RowSet;
+ import org.apache.hive.service.cli.SessionHandle;
++import org.junit.AfterClass;
+ import org.junit.Assert;
+-import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ /**
+  * Test restricted information schema with privilege synchronization
+  */
+-public class TestInformationSchemaWithPrivilege {
++public abstract class InformationSchemaWithPrivilegeTestBase {
+ 
+   // Group mapping:
+   // group_a: user1, user2
+@@ -175,14 +176,18 @@ public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory metastoreC
+     }
+   }
+ 
++  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
++  private static final String TRUST_STORE_NAME = "truststore.jks";
++  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
++
+   private static MiniHS2 miniHS2 = null;
+   private static MiniZooKeeperCluster zkCluster = null;
+   private static Map<String, String> confOverlay;
+ 
+-  @BeforeClass
+-  public static void beforeTest() throws Exception {
++
++  public static void setupInternal(boolean zookeeperSSLEnabled) throws Exception {
+     File zkDataDir = new File(System.getProperty("test.tmp.dir"));
+-    zkCluster = new MiniZooKeeperCluster();
++    zkCluster = new MiniZooKeeperCluster(zookeeperSSLEnabled);
+     int zkPort = zkCluster.startup(zkDataDir);
+ 
+     miniHS2 = new MiniHS2(new HiveConf());
+@@ -206,9 +211,34 @@ public static void beforeTest() throws Exception {
+     confOverlay.put(ConfVars.HIVE_AUTHENTICATOR_MANAGER.varname, FakeGroupAuthenticator.class.getName());
+     confOverlay.put(ConfVars.HIVE_AUTHORIZATION_ENABLED.varname, "true");
+     confOverlay.put(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST.varname, ".*");
++
++    if(zookeeperSSLEnabled) {
++      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
++          System.getProperty("test.data.files") :
++          (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
++      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION.varname,
++          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
++      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
++          KEY_STORE_TRUST_STORE_PASSWORD);
++      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION.varname,
++          dataFileDir + File.separator + TRUST_STORE_NAME);
++      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
++          KEY_STORE_TRUST_STORE_PASSWORD);
++      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE.varname, "true");
++    }
+     miniHS2.start(confOverlay);
+   }
+ 
++  @AfterClass
++  public static void tearDown() throws IOException {
++    if (miniHS2 != null) {
++      miniHS2.stop();
++    }
++    if (zkCluster != null) {
++      zkCluster.shutdown();
++    }
++  }
++
+   @Test
+   public void test() throws Exception {
+ 
+diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
+new file mode 100644
+index 0000000000..ffa1843ae2
+--- /dev/null
++++ itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hive.service.server;
++
++import org.junit.BeforeClass;
++
++/**
++ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication disabled.
++ */
++public class TestInformationSchemaWithPrivilegeZookeeperPlain extends InformationSchemaWithPrivilegeTestBase {
++
++  public TestInformationSchemaWithPrivilegeZookeeperPlain() {
++    super();
++  }
++
++  @BeforeClass
++  public static void setUp() throws Exception{
++    setupInternal(false);
++  }
++}
+diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
+new file mode 100644
+index 0000000000..e12f4948d4
+--- /dev/null
++++ itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hive.service.server;
++
++import org.junit.BeforeClass;
++
++/**
++ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication enabled.
++ */
++public class TestInformationSchemaWithPrivilegeZookeeperSSL extends InformationSchemaWithPrivilegeTestBase {
++
++  public TestInformationSchemaWithPrivilegeZookeeperSSL() {
++    super();
++  }
++
++  @BeforeClass
++  public static void setUp() throws Exception{
++    setupInternal(true);
++  }
++}
+diff --git jdbc/src/java/org/apache/hive/jdbc/Utils.java jdbc/src/java/org/apache/hive/jdbc/Utils.java
+index e23826eb56..6cb6853077 100644
+--- jdbc/src/java/org/apache/hive/jdbc/Utils.java
++++ jdbc/src/java/org/apache/hive/jdbc/Utils.java
+@@ -116,6 +116,11 @@
+     public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
+     public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA";
+     public static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
++    public static final String ZOOKEEPER_SSL_ENABLE = "zooKeeperSSLEnable";
++    public static final String ZOOKEEPER_KEYSTORE_LOCATION = "zooKeeperKeystoreLocation";
++    public static final String ZOOKEEPER_KEYSTORE_PASSWORD= "zooKeeperKeystorePassword";
++    public static final String ZOOKEEPER_TRUSTSTORE_LOCATION  = "zooKeeperTruststoreLocation";
++    public static final String ZOOKEEPER_TRUSTSTORE_PASSWORD = "zooKeeperTruststorePassword";
+     // Default namespace value on ZooKeeper.
+     // This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
+     static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
+@@ -168,6 +173,11 @@
+     private boolean isEmbeddedMode = false;
+     private String suppliedURLAuthority;
+     private String zooKeeperEnsemble = null;
++    private boolean zooKeeperSslEnabled = false;
++    private String zookeeperKeyStoreLocation = "";
++    private String zookeeperKeyStorePassword = "";
++    private String zookeeperTrustStoreLocation = "";
++    private String zookeeperTrustStorePassword = "";
+     private String currentHostZnodePath;
+     private final List<String> rejectedHostZnodePaths = new ArrayList<String>();
+ 
+@@ -185,6 +195,12 @@ public JdbcConnectionParams(JdbcConnectionParams params) {
+       this.isEmbeddedMode = params.isEmbeddedMode;
+       this.suppliedURLAuthority = params.suppliedURLAuthority;
+       this.zooKeeperEnsemble = params.zooKeeperEnsemble;
++      this.zooKeeperSslEnabled = params.zooKeeperSslEnabled;
++      this.zookeeperKeyStoreLocation = params.zookeeperKeyStoreLocation;
++      this.zookeeperKeyStorePassword = params.zookeeperKeyStorePassword;
++      this.zookeeperTrustStoreLocation = params.zookeeperTrustStoreLocation;
++      this.zookeeperTrustStorePassword = params.zookeeperTrustStorePassword;
++
+       this.currentHostZnodePath = params.currentHostZnodePath;
+       this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths);
+     }
+@@ -228,6 +244,25 @@ public String getSuppliedURLAuthority() {
+     public String getZooKeeperEnsemble() {
+       return zooKeeperEnsemble;
+     }
++    public boolean isZooKeeperSslEnabled() {
++      return zooKeeperSslEnabled;
++    }
++
++    public String getZookeeperKeyStoreLocation() {
++      return zookeeperKeyStoreLocation;
++    }
++
++    public String getZookeeperKeyStorePassword() {
++      return zookeeperKeyStorePassword;
++    }
++
++    public String getZookeeperTrustStoreLocation() {
++      return zookeeperTrustStoreLocation;
++    }
++
++    public String getZookeeperTrustStorePassword() {
++      return zookeeperTrustStorePassword;
++    }
+ 
+     public List<String> getRejectedHostZnodePaths() {
+       return rejectedHostZnodePaths;
+@@ -277,6 +312,26 @@ public void setZooKeeperEnsemble(String zooKeeperEnsemble) {
+       this.zooKeeperEnsemble = zooKeeperEnsemble;
+     }
+ 
++    public void setZooKeeperSslEnabled(boolean zooKeeperSslEnabled) {
++      this.zooKeeperSslEnabled = zooKeeperSslEnabled;
++    }
++
++    public void setZookeeperKeyStoreLocation(String zookeeperKeyStoreLocation) {
++      this.zookeeperKeyStoreLocation = zookeeperKeyStoreLocation;
++    }
++
++    public void setZookeeperKeyStorePassword(String zookeeperKeyStorePassword) {
++      this.zookeeperKeyStorePassword = zookeeperKeyStorePassword;
++    }
++
++    public void setZookeeperTrustStoreLocation(String zookeeperTrustStoreLocation) {
++      this.zookeeperTrustStoreLocation = zookeeperTrustStoreLocation;
++    }
++
++    public void setZookeeperTrustStorePassword(String zookeeperTrustStorePassword) {
++      this.zookeeperTrustStorePassword = zookeeperTrustStorePassword;
++    }
++
+     public void setCurrentHostZnodePath(String currentHostZnodePath) {
+       this.currentHostZnodePath = currentHostZnodePath;
+     }
+@@ -485,6 +540,7 @@ public static JdbcConnectionParams extractURLComponents(String uri, Properties i
+         uri = uri.replace(dummyAuthorityString, authorityStr);
+         // Set ZooKeeper ensemble in connParams for later use
+         connParams.setZooKeeperEnsemble(authorityStr);
++        ZooKeeperHiveClientHelper.setZkSSLParams(connParams);
+       } else {
+         URI jdbcBaseURI = URI.create(URI_HIVE_PREFIX + "//" + authorityStr);
+         // Check to prevent unintentional use of embedded mode. A missing "/"
+@@ -576,7 +632,6 @@ private static void handleParamDeprecation(Map<String, String> fromMap, Map<Stri
+    * host:port pairs.
+    *
+    * @param uri
+-   * @param connParams
+    * @return
+    * @throws JdbcUriParseException
+    */
+diff --git jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+index 759ba8a5ef..3d89fa223a 100644
+--- jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
++++ jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+@@ -32,6 +32,7 @@
+ import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.curator.utils.ZKPaths;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.common.SSLZookeeperFactory;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
+ import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
+@@ -85,11 +86,40 @@ public static boolean isZkDynamicDiscoveryMode(Map<String, String> sessionConf)
+       JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode));
+   }
+ 
++  /**
++   * Parse and set up the SSL communication related Zookeeper params in connParams from sessionVars.
++   * @param connParams
++   */
++  public static void setZkSSLParams(JdbcConnectionParams connParams) {
++    Map<String, String> sessionConf = connParams.getSessionVars();
++    boolean sslEnabled = false;
++    if (sessionConf.containsKey(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE)) {
++      sslEnabled = Boolean.parseBoolean(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE));
++      connParams.setZooKeeperSslEnabled(sslEnabled);
++    }
++    if (sslEnabled) {
++      connParams.setZookeeperKeyStoreLocation(
++          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_LOCATION), ""));
++      connParams.setZookeeperKeyStorePassword(
++          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_PASSWORD), ""));
++      connParams.setZookeeperTrustStoreLocation(
++          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_LOCATION), ""));
++      connParams.setZookeeperTrustStorePassword(
++          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_PASSWORD), ""));
++    }
++  }
++
+   private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception {
+     String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
+     CuratorFramework zooKeeperClient =
+-        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+-            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
++        CuratorFrameworkFactory.builder()
++            .connectString(zooKeeperEnsemble)
++            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
++            .zookeeperFactory(
++            new SSLZookeeperFactory(connParams.isZooKeeperSslEnabled(), connParams.getZookeeperKeyStoreLocation(),
++                connParams.getZookeeperKeyStorePassword(), connParams.getZookeeperTrustStoreLocation(),
++                connParams.getZookeeperTrustStorePassword()))
++            .build();
+     zooKeeperClient.start();
+     return zooKeeperClient;
+   }
+diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+index d28fd1778c..2b21baaea4 100644
+--- llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
++++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+@@ -30,20 +30,18 @@
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.api.ACLProvider;
+ import org.apache.curator.framework.imps.CuratorFrameworkState;
+ import org.apache.curator.framework.recipes.cache.ChildData;
+ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
++import org.apache.curator.framework.recipes.nodes.PersistentNode;
+ import org.apache.curator.framework.state.ConnectionState;
+ import org.apache.curator.framework.state.ConnectionStateListener;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.curator.utils.CloseableUtils;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+ import org.apache.hadoop.hive.llap.LlapUtil;
+@@ -55,6 +53,7 @@
+ import org.apache.hadoop.registry.client.types.ServiceRecord;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.yarn.conf.YarnConfiguration;
++import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException.InvalidACLException;
+ import org.apache.zookeeper.KeeperException.NodeExistsException;
+ import org.apache.zookeeper.ZooDefs;
+@@ -111,7 +110,7 @@
+   private final Map<String, Set<InstanceType>> nodeToInstanceCache;
+ 
+   // The registration znode.
+-  private PersistentEphemeralNode znode;
++  private PersistentNode znode;
+   private String znodePath; // unique identity for this instance
+ 
+   private PathChildrenCache instancesCache; // Created on demand.
+@@ -212,28 +211,23 @@ private ACLProvider getACLProviderForZKPath(String zkPath) {
+   }
+ 
+   private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
+-    String zkEnsemble = getQuorumServers(conf);
+-    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
+-      ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+-    int connectionTimeout = (int) HiveConf.getTimeVar(conf,
+-      ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+-    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
+-      ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+-    int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+-
+-    LOG.info("Creating curator client with connectString: {} sessionTimeoutMs: {} connectionTimeoutMs: {}" +
+-      " namespace: {} exponentialBackoff - sleepTime: {} maxRetries: {}", zkEnsemble, sessionTimeout,
+-      connectionTimeout, namespace, baseSleepTime, maxRetries);
+-    // Create a CuratorFramework instance to be used as the ZooKeeper client
+-    // Use the zooKeeperAclProvider to create appropriate ACLs
+-    return CuratorFrameworkFactory.builder()
+-      .connectString(zkEnsemble)
+-      .sessionTimeoutMs(sessionTimeout)
+-      .connectionTimeoutMs(connectionTimeout)
+-      .aclProvider(zooKeeperAclProvider)
+-      .namespace(namespace)
+-      .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+-      .build();
++    return ZooKeeperHiveHelper.builder()
++        .quorum(conf.get(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname))
++        .clientPort(conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
++            ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()))
++        .connectionTimeout(
++            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
++        .sessionTimeout(
++            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS))
++        .baseSleepTime(
++            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS))
++        .maxRetries(HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
++        .sslEnabled(HiveConf.getBoolVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
++        .keyStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
++        .keyStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
++        .trustStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
++        .trustStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD))
++        .build().getNewZookeeperClient(zooKeeperAclProvider, namespace);
+   }
+ 
+   private static List<ACL> createSecureAcls() {
+@@ -283,9 +277,9 @@ protected final String registerServiceRecord(ServiceRecord srv, final String uni
+ 
+     // Create a znode under the rootNamespace parent for this instance of the server
+     try {
+-      // PersistentEphemeralNode will make sure the ephemeral node created on server will be present
++      // PersistentNode will make sure the ephemeral node created on server will be present
+       // even under connection or session interruption (will automatically handle retries)
+-      znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
++      znode = new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false,
+           workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
+     
+       // start the creation of znodes
+diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
+index fa3a382367..e8eaac0fd9 100644
+--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
++++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
+@@ -18,14 +18,11 @@
+ 
+ package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
+ 
+-import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.hive.common.util.ShutdownHookManager;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.CuratorFrameworkFactory;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ 
+ public class CuratorFrameworkSingleton {
+@@ -50,15 +47,8 @@ public static synchronized CuratorFramework getInstance(HiveConf hiveConf) {
+       } else {
+         conf = hiveConf;
+       }
+-      int sessionTimeout =  (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+-      int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+-      int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+-      String quorumServers = conf.getZKConfig().getQuorumServers();
+ 
+-      sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers)
+-          .sessionTimeoutMs(sessionTimeout)
+-          .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+-          .build();
++      sharedClient = conf.getZKConfig().getNewZookeeperClient();
+       sharedClient.start();
+     }
+ 
+diff --git ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
+index eec628263a..dc11ae1611 100644
+--- ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
++++ ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
+@@ -34,7 +34,9 @@
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hbase.HConstants;
+-import org.apache.zookeeper.server.NIOServerCnxnFactory;
++import org.apache.zookeeper.common.ClientX509Util;
++import org.apache.zookeeper.common.X509Util;
++import org.apache.zookeeper.server.ServerCnxnFactory;
+ import org.apache.zookeeper.server.ZooKeeperServer;
+ import org.apache.zookeeper.server.persistence.FileTxnLog;
+ import org.slf4j.Logger;
+@@ -54,6 +56,9 @@
+ 
+   private static final int TICK_TIME = 2000;
+   private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
++  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
++  private static final String TRUST_STORE_NAME = "truststore.jks";
++  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
+   private int connectionTimeout;
+ 
+   private boolean started;
+@@ -61,7 +66,7 @@
+   /** The default port. If zero, we use a random port. */
+   private int defaultClientPort = 0;
+ 
+-  private List<NIOServerCnxnFactory> standaloneServerFactoryList;
++  private List<ServerCnxnFactory> standaloneServerFactoryList;
+   private List<ZooKeeperServer> zooKeeperServers;
+   private List<Integer> clientPortList;
+ 
+@@ -70,11 +75,20 @@
+ 
+   private Configuration configuration;
+ 
++  private boolean sslEnabled = false;
++
+   public MiniZooKeeperCluster() {
+     this(new Configuration());
+   }
+-
++  public MiniZooKeeperCluster(boolean sslEnabled) {
++    this(new Configuration(), sslEnabled);
++  }
+   public MiniZooKeeperCluster(Configuration configuration) {
++    this(configuration, false);
++  }
++
++  public MiniZooKeeperCluster(Configuration configuration, boolean sslEnabled) {
++    this.sslEnabled = sslEnabled;
+     this.started = false;
+     this.configuration = configuration;
+     activeZKServerIndex = -1;
+@@ -167,7 +181,7 @@ public int getZooKeeperServerNum() {
+   }
+ 
+   // / XXX: From o.a.zk.t.ClientBase
+-  private static void setupTestEnv() {
++  private static void setupTestEnv(boolean sslEnabled) {
+     // With ZooKeeper 3.5 we need to whitelist the 4 letter commands we use
+     System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+ 
+@@ -176,6 +190,9 @@ private static void setupTestEnv() {
+     // resulting in test failure (client timeout on first session).
+     // set env and directly in order to handle static init/gc issues
+     System.setProperty("zookeeper.preAllocSize", "100");
++    if (sslEnabled) {
++      System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
++    }
+     FileTxnLog.setPreallocSize(100 * 1024);
+   }
+ 
+@@ -200,7 +217,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
+       return -1;
+     }
+ 
+-    setupTestEnv();
++    setupTestEnv(sslEnabled);
+     shutdown();
+ 
+     int tentativePort = -1; // the seed port
+@@ -229,12 +246,10 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
+       // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
+       server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
+       server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
+-      NIOServerCnxnFactory standaloneServerFactory;
++      ServerCnxnFactory standaloneServerFactory;
+       while (true) {
+         try {
+-          standaloneServerFactory = new NIOServerCnxnFactory();
+-          standaloneServerFactory.configure(new InetSocketAddress(currentClientPort),
+-              configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
++          standaloneServerFactory = createServerCnxnFactory(currentClientPort);
+         } catch (BindException e) {
+           LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
+           // We're told to use some port but it's occupied, fail
+@@ -252,7 +267,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
+       // Start up this ZK server
+       standaloneServerFactory.startup(server);
+       // Runs a 'stat' against the servers.
+-      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
++      if (!sslEnabled && !waitForServerUp(currentClientPort, connectionTimeout)) {
+         throw new IOException("Waiting for startup of standalone server");
+       }
+ 
+@@ -276,6 +291,36 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
+     return clientPort;
+   }
+ 
++  private ServerCnxnFactory createServerCnxnFactory(int currentClientPort) throws IOException {
++    ServerCnxnFactory serverCnxnFactory = null;
++    if (sslEnabled) {
++      System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
++          "org.apache.zookeeper.server.NettyServerCnxnFactory");
++      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
++              System.getProperty("test.data.files") :
++              configuration.get("test.data.files").replace('\\', '/').replace("c:", "");
++      X509Util x509Util = new ClientX509Util();
++      System.setProperty(x509Util.getSslKeystoreLocationProperty(),
++          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
++      System.setProperty(x509Util.getSslKeystorePasswdProperty(),
++          KEY_STORE_TRUST_STORE_PASSWORD);
++      System.setProperty(x509Util.getSslTruststoreLocationProperty(),
++          dataFileDir + File.separator + TRUST_STORE_NAME);
++      System.setProperty(x509Util.getSslTruststorePasswdProperty(),
++          KEY_STORE_TRUST_STORE_PASSWORD);
++      serverCnxnFactory = ServerCnxnFactory.createFactory();
++      serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
++          configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS),
++          true);
++    } else {
++      serverCnxnFactory = ServerCnxnFactory.createFactory();
++      serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
++          configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
++    }
++    return serverCnxnFactory;
++  }
++
++
+   private void createDir(File dir) throws IOException {
+     try {
+       if (!dir.exists()) {
+@@ -292,7 +337,7 @@ private void createDir(File dir) throws IOException {
+   public void shutdown() throws IOException {
+     // shut down all the zk servers
+     for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+-      NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
++      ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
+       int clientPort = clientPortList.get(i);
+ 
+       standaloneServerFactory.shutdown();
+@@ -305,6 +350,7 @@ public void shutdown() throws IOException {
+     for (ZooKeeperServer zkServer : zooKeeperServers) {
+       //explicitly close ZKDatabase since ZookeeperServer does not close them
+       zkServer.getZKDatabase().close();
++      zkServer.shutdown(true);
+     }
+     zooKeeperServers.clear();
+ 
+@@ -328,7 +374,7 @@ public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedExc
+     }
+ 
+     // Shutdown the current active one
+-    NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
++    ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
+     int clientPort = clientPortList.get(activeZKServerIndex);
+ 
+     standaloneServerFactory.shutdown();
+@@ -366,7 +412,7 @@ public void killOneBackupZooKeeperServer() throws IOException, InterruptedExcept
+ 
+     int backupZKServerIndex = activeZKServerIndex + 1;
+     // Shutdown the current active one
+-    NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
++    ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
+     int clientPort = clientPortList.get(backupZKServerIndex);
+ 
+     standaloneServerFactory.shutdown();
+diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java
+index fece82e266..181ea5d6d5 100644
+--- service/src/java/org/apache/hive/service/server/HiveServer2.java
++++ service/src/java/org/apache/hive/service/server/HiveServer2.java
+@@ -43,14 +43,12 @@
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.api.ACLProvider;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorEventType;
+ import org.apache.curator.framework.recipes.leader.LeaderLatch;
+ import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.common.JvmPauseMonitor;
+ import org.apache.hadoop.hive.common.LogUtils;
+@@ -1078,14 +1076,9 @@ private void maybeStartCompactorThreads(HiveConf hiveConf) throws Exception {
+    */
+   static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
+     HiveConf hiveConf = new HiveConf();
+-    String zooKeeperEnsemble = hiveConf.getZKConfig().getQuorumServers();
+-    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+-    int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+-    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+-    CuratorFramework zooKeeperClient =
+-        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+-            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
++    CuratorFramework zooKeeperClient = hiveConf.getZKConfig().getNewZookeeperClient();
+     zooKeeperClient.start();
++    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+     List<String> znodePaths =
+         zooKeeperClient.getChildren().forPath(
+             ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
+new file mode 100644
+index 0000000000..ee01731fa9
+--- /dev/null
++++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
+@@ -0,0 +1,78 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.hive.common;
++
++import org.apache.commons.lang3.StringUtils;
++import org.apache.curator.utils.ZookeeperFactory;
++import org.apache.zookeeper.Watcher;
++import org.apache.zookeeper.ZooKeeper;
++import org.apache.zookeeper.client.ZKClientConfig;
++import org.apache.zookeeper.common.ClientX509Util;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * Factory to create Zookeeper clients with the zookeeper.client.secure enabled,
++ * allowing SSL communication with the Zookeeper server.
++ */
++public class SSLZookeeperFactory implements ZookeeperFactory {
++
++  private static final Logger LOG = LoggerFactory.getLogger(SSLZookeeperFactory.class);
++
++  private boolean sslEnabled;
++  private String keyStoreLocation;
++  private String keyStorePassword;
++  private String trustStoreLocation;
++  private String trustStorePassword;
++
++  public SSLZookeeperFactory(boolean sslEnabled, String keyStoreLocation, String keyStorePassword,
++      String trustStoreLocation, String trustStorePassword) {
++
++    this.sslEnabled = sslEnabled;
++    this.keyStoreLocation = keyStoreLocation;
++    this.keyStorePassword = keyStorePassword;
++    this.trustStoreLocation = trustStoreLocation;
++    this.trustStorePassword = trustStorePassword;
++    if (sslEnabled) {
++      if (StringUtils.isEmpty(keyStoreLocation)) {
++        LOG.warn("Missing keystoreLocation parameter");
++      }
++      if (StringUtils.isEmpty(trustStoreLocation)) {
++        LOG.warn("Missing trustStoreLocation parameter");
++      }
++    }
++  }
++
++  @Override
++  public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
++      boolean canBeReadOnly) throws Exception {
++    if (!this.sslEnabled) {
++      return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
++    }
++    ZKClientConfig clientConfig = new ZKClientConfig();
++    clientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
++    clientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
++    ClientX509Util x509Util = new ClientX509Util();
++    clientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(), this.keyStoreLocation);
++    clientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(), this.keyStorePassword);
++    clientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(), this.trustStoreLocation);
++    clientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(), this.trustStorePassword);
++    return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, clientConfig);
++  }
++}
+diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+index 99f7c97877..71d8651712 100644
+--- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
++++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+@@ -23,10 +23,11 @@
+ import java.util.ArrayList;
+ import java.util.concurrent.TimeUnit;
+ import java.util.List;
++
+ import org.apache.curator.framework.api.ACLProvider;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
++import org.apache.curator.framework.recipes.nodes.PersistentNode;
+ import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+@@ -55,28 +56,164 @@
+   public static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHiveHelper.class.getName());
+   public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
+ 
+-  private String quorum = null;
+-  private String clientPort = null;
+-  private String rootNamespace = null;
+-  private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
++  /**
++   * ZooKeeperHiveHelperBuilder. A builder class to initialize ZooKeeperHiveHelper.
++   */
++  public static class ZooKeeperHiveHelperBuilder {
++    private String quorum = null;
++    private String clientPort = null;
++    private String serverRegistryNameSpace = null;
++    private int connectionTimeout;
++    private int sessionTimeout;
++    private int baseSleepTime;
++    private int maxRetries;
++    private boolean sslEnabled = false;
++    private String keyStoreLocation = null;
++    private String keyStorePassword = null;
++    private String trustStoreLocation = null;
++    private String trustStorePassword = null;
++
++    public ZooKeeperHiveHelper build() {
++      return new ZooKeeperHiveHelper(this);
++    }
++
++    public ZooKeeperHiveHelperBuilder quorum(String quorum) {
++      this.quorum = quorum;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder clientPort(String clientPort) {
++      this.clientPort = clientPort;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder serverRegistryNameSpace(String serverRegistryNameSpace) {
++      this.serverRegistryNameSpace = serverRegistryNameSpace;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder connectionTimeout(int connectionTimeout) {
++      this.connectionTimeout = connectionTimeout;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder sessionTimeout(int sessionTimeout) {
++      this.sessionTimeout = sessionTimeout;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder baseSleepTime(int baseSleepTime) {
++      this.baseSleepTime = baseSleepTime;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder maxRetries(int maxRetries) {
++      this.maxRetries = maxRetries;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder sslEnabled(boolean sslEnabled) {
++      this.sslEnabled = sslEnabled;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder keyStoreLocation(String keyStoreLocation) {
++      this.keyStoreLocation = keyStoreLocation;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder keyStorePassword(String keyStorePassword) {
++      this.keyStorePassword = keyStorePassword;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder trustStoreLocation(String trustStoreLocation) {
++      this.trustStoreLocation = trustStoreLocation;
++      return this;
++    }
++
++    public ZooKeeperHiveHelperBuilder trustStorePassword(String trustStorePassword) {
++      this.trustStorePassword = trustStorePassword;
++      return this;
++    }
++
++    public String getQuorum() {
++      return quorum;
++    }
++
++    public String getClientPort() {
++      return clientPort;
++    }
++
++    public String getServerRegistryNameSpace() {
++      return serverRegistryNameSpace;
++    }
++
++    public int getConnectionTimeout() {
++      return connectionTimeout;
++    }
++
++    public int getSessionTimeout() {
++      return sessionTimeout;
++    }
++
++    public int getBaseSleepTime() {
++      return baseSleepTime;
++    }
++
++    public int getMaxRetries() {
++      return maxRetries;
++    }
++
++    public boolean isSslEnabled() {
++      return sslEnabled;
++    }
++
++    public String getKeyStoreLocation() {
++      return keyStoreLocation;
++    }
++
++    public String getKeyStorePassword() {
++      return keyStorePassword;
++    }
++
++    public String getTrustStoreLocation() {
++      return trustStoreLocation;
++    }
++
++    public String getTrustStorePassword() {
++      return trustStorePassword;
++    }
++  }
++
++  public static ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder builder() {
++    return new ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder();
++  }
++
++  private String quorum;
++  private String rootNamespace;
++  private int connectionTimeout;
+   private int sessionTimeout;
+   private int baseSleepTime;
+   private int maxRetries;
++  private boolean sslEnabled;
+ 
++  private SSLZookeeperFactory sslZookeeperFactory;
+   private CuratorFramework zooKeeperClient;
+-  private PersistentEphemeralNode znode;
++  private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
++  private PersistentNode znode;
++
+ 
+-  public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespace,
+-                             int sessionTimeout, int baseSleepTime, int maxRetries) {
++  public ZooKeeperHiveHelper(ZooKeeperHiveHelperBuilder builder) {
+     // Get the ensemble server addresses in the format host1:port1, host2:port2, ... . Append
+     // the configured port to hostname if the hostname doesn't contain a port.
+-    String[] hosts = quorum.split(",");
++    String[] hosts = builder.getQuorum().split(",");
+     StringBuilder quorumServers = new StringBuilder();
+     for (int i = 0; i < hosts.length; i++) {
+       quorumServers.append(hosts[i].trim());
+       if (!hosts[i].contains(":")) {
+         quorumServers.append(":");
+-        quorumServers.append(clientPort);
++        quorumServers.append(builder.getClientPort());
+       }
+ 
+       if (i != hosts.length - 1) {
+@@ -85,11 +222,19 @@ public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespac
+     }
+ 
+     this.quorum = quorumServers.toString();
+-    this.clientPort = clientPort;
+-    this.rootNamespace = rootNamespace;
+-    this.sessionTimeout = sessionTimeout;
+-    this.baseSleepTime = baseSleepTime;
+-    this.maxRetries = maxRetries;
++    this.rootNamespace = builder.getServerRegistryNameSpace();
++    this.connectionTimeout = builder.getConnectionTimeout();
++    this.sessionTimeout = builder.getSessionTimeout();
++    this.baseSleepTime = builder.getBaseSleepTime();
++    this.maxRetries = builder.getMaxRetries();
++    this.sslEnabled = builder.isSslEnabled();
++    this.sslZookeeperFactory =
++        new SSLZookeeperFactory(sslEnabled,
++            builder.getKeyStoreLocation(),
++            builder.getKeyStorePassword(),
++            builder.getTrustStoreLocation(),
++            builder.getTrustStorePassword());
++
+   }
+ 
+   /**
+@@ -118,8 +263,7 @@ public void addServerInstanceToZooKeeper(String znodePathPrefix, String znodeDat
+                       + ZOOKEEPER_PATH_SEPARATOR + znodePathPrefix;
+       byte[] znodeDataUTF8 = znodeData.getBytes(StandardCharsets.UTF_8);
+       znode =
+-              new PersistentEphemeralNode(zooKeeperClient,
+-                      PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
++              new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false, pathPrefix, znodeDataUTF8);
+       znode.start();
+       // We'll wait for 120s for node creation
+       long znodeCreationTimeout = 120;
+@@ -147,17 +291,7 @@ public void addServerInstanceToZooKeeper(String znodePathPrefix, String znodeDat
+ 
+   public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
+                                                boolean addParentNode) throws Exception {
+-    String zooKeeperEnsemble = getQuorumServers();
+-    // Create a CuratorFramework instance to be used as the ZooKeeper client.
+-    // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
+-    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+-            .connectString(zooKeeperEnsemble)
+-            .sessionTimeoutMs(sessionTimeout)
+-            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
+-    if (zooKeeperAclProvider != null) {
+-      builder = builder.aclProvider(zooKeeperAclProvider);
+-    }
+-    CuratorFramework zkClient = builder.build();
++    CuratorFramework zkClient = getNewZookeeperClient(zooKeeperAclProvider);
+     zkClient.start();
+ 
+     // Create the parent znodes recursively; ignore if the parent already exists.
+@@ -177,6 +311,39 @@ public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
+     }
+     return zkClient;
+   }
++  public CuratorFramework getNewZookeeperClient() {
++    return getNewZookeeperClient(null, null);
++  }
++  public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider) {
++    return getNewZookeeperClient(zooKeeperAclProvider, null);
++  }
++
++  public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider, String nameSpace) {
++    LOG.info("Creating curator client with connectString: {} namespace: {} sessionTimeoutMs: {}" +
++            " connectionTimeoutMs: {} exponentialBackoff - sleepTime: {} maxRetries: {} sslEnabled: {}",
++        quorum, nameSpace,  sessionTimeout,
++        connectionTimeout, baseSleepTime, maxRetries, sslEnabled);
++    // Create a CuratorFramework instance to be used as the ZooKeeper client.
++    // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
++    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
++            .connectString(quorum)
++            .namespace(nameSpace)
++            .zookeeperFactory(this.sslZookeeperFactory);
++    if (connectionTimeout > 0) {
++      builder = builder.connectionTimeoutMs(connectionTimeout);
++    }
++    if (sessionTimeout > 0) {
++      builder = builder.sessionTimeoutMs(sessionTimeout);
++    }
++    if (maxRetries > 0) {
++      builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
++    }
++    if (zooKeeperAclProvider != null) {
++      builder = builder.aclProvider(zooKeeperAclProvider);
++    }
++
++    return builder.build();
++  }
+ 
+   public void removeServerInstanceFromZooKeeper() throws Exception {
+     setDeregisteredWithZooKeeper(true);
+@@ -185,7 +352,9 @@ public void removeServerInstanceFromZooKeeper() throws Exception {
+       znode.close();
+       znode = null;
+     }
+-    zooKeeperClient.close();
++    if (zooKeeperClient != null) {
++      zooKeeperClient.close();
++    }
+     LOG.info("Server instance removed from ZooKeeper.");
+   }
+ 
+diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+index fc6a2fd43a..d6a6c96a6a 100644
+--- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
++++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+@@ -258,7 +258,11 @@ public String toString() {
+       ConfVars.SSL_TRUSTSTORE_PASSWORD.varname,
+       ConfVars.SSL_TRUSTSTORE_PASSWORD.hiveName,
+       ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.varname,
+-      ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName
++      ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName,
++      ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
++      ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.hiveName,
++      ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
++      ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.hiveName
+   );
+ 
+   public static ConfVars getMetaConf(String name) {
+@@ -1072,32 +1076,56 @@ public static ConfVars getMetaConf(String name) {
+             "If ZooKeeper is configured for Kerberos authentication. This could be useful when cluster\n" +
+             "is kerberized, but Zookeeper is not."),
+     THRIFT_ZOOKEEPER_CLIENT_PORT("metastore.zookeeper.client.port",
+-            "hive.metastore.zookeeper.client.port", "2181",
++            "hive.zookeeper.client.port", "2181",
+             "The port of ZooKeeper servers to talk to.\n" +
+                     "If the list of Zookeeper servers specified in hive.metastore.thrift.uris" +
+                     " does not contain port numbers, this value is used."),
+     THRIFT_ZOOKEEPER_SESSION_TIMEOUT("metastore.zookeeper.session.timeout",
+-            "hive.metastore.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
++            "hive.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
+             new TimeValidator(TimeUnit.MILLISECONDS),
+             "ZooKeeper client's session timeout (in milliseconds). The client is disconnected\n" +
+                     "if a heartbeat is not sent in the timeout."),
+     THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT("metastore.zookeeper.connection.timeout",
+-            "hive.metastore.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
++            "hive.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
+             new TimeValidator(TimeUnit.SECONDS),
+             "ZooKeeper client's connection timeout in seconds. " +
+                     "Connection timeout * hive.metastore.zookeeper.connection.max.retries\n" +
+                     "with exponential backoff is when curator client deems connection is lost to zookeeper."),
+     THRIFT_ZOOKEEPER_NAMESPACE("metastore.zookeeper.namespace",
+-            "hive.metastore.zookeeper.namespace", "hive_metastore",
++            "hive.zookeeper.namespace", "hive_metastore",
+             "The parent node under which all ZooKeeper nodes for metastores are created."),
+     THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES("metastore.zookeeper.connection.max.retries",
+-            "hive.metastore.zookeeper.connection.max.retries", 3,
++            "hive.zookeeper.connection.max.retries", 3,
+             "Max number of times to retry when connecting to the ZooKeeper server."),
+     THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME("metastore.zookeeper.connection.basesleeptime",
+-            "hive.metastore.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
++            "hive.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
+             new TimeValidator(TimeUnit.MILLISECONDS),
+             "Initial amount of time (in milliseconds) to wait between retries\n" +
+                     "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
++    THRIFT_ZOOKEEPER_SSL_ENABLE("metastore.zookeeper.ssl.client.enable",
++        "hive.zookeeper.ssl.client.enable", false,
++        "Set client to use TLS when connecting to ZooKeeper.  An explicit value overrides any value set via the " +
++            "zookeeper.client.secure system property (note the different name).  Defaults to false if neither is set."),
++    THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION("metastore.zookeeper.ssl.keystore.location",
++        "hive.zookeeper.ssl.keystore.location", "",
++        "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
++            "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
++            "system property (note the camelCase)."),
++    THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("metastore.zookeeper.ssl.keystore.password",
++        "hive.zookeeper.ssl.keystore.password", "",
++        "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
++            "Overrides any explicit value set via the zookeeper.ssl.keyStore.password" +
++            "system property (note the camelCase)."),
++    THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("metastore.zookeeper.ssl.truststore.location",
++        "hive.zookeeper.ssl.truststore.location", "",
++        "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
++            "Overrides any explicit value set via the zookeeper.ssl.trustStore.location " +
++            "system property (note the camelCase)."),
++    THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("metastore.zookeeper.ssl.truststore.password",
++        "hive.zookeeper.ssl.truststore.password", "",
++        "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
++            "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
++            "system property (note the camelCase)."),
+     THRIFT_URI_SELECTION("metastore.thrift.uri.selection", "hive.metastore.uri.selection", "RANDOM",
+         new StringSetValidator("RANDOM", "SEQUENTIAL"),
+         "Determines the selection mechanism used by metastore client to connect to remote " +
+@@ -2018,14 +2046,22 @@ public static boolean isEmbeddedMetaStore(String msUri) {
+   }
+ 
+   public static ZooKeeperHiveHelper getZKConfig(Configuration conf) {
+-    return new ZooKeeperHiveHelper(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS),
+-            MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT),
+-            MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE),
+-            (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
+-                    TimeUnit.MILLISECONDS),
+-            (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+-                    TimeUnit.MILLISECONDS),
+-            MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES));
++    return ZooKeeperHiveHelper.builder()
++        .quorum(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS))
++        .clientPort(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT))
++        .serverRegistryNameSpace(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE))
++        .connectionTimeout((int) getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT,
++            TimeUnit.MILLISECONDS))
++        .sessionTimeout((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
++            TimeUnit.MILLISECONDS))
++        .baseSleepTime((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
++            TimeUnit.MILLISECONDS))
++        .maxRetries(MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES))
++        .sslEnabled(MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_ENABLE))
++        .keyStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
++        .keyStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
++        .trustStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
++        .trustStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
+   }
+ 
+   /**
+diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
+index b35dc7ce4b..239bff6dc9 100644
+--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
++++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
+@@ -46,6 +46,16 @@
+   public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+       "hive.cluster.delegation.token.store.zookeeper.acl";
+   public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hivedelegation";
++  public static final String DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE =
++      "hive.cluster.delegation.token.store.zookeeper.ssl.client.enable";
++  public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION =
++      "hive.cluster.delegation.token.store.zookeeper.keystore.location";
++  public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD =
++      "hive.cluster.delegation.token.store.zookeeper.keystore.password";
++  public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION =
++      "hive.cluster.delegation.token.store.zookeeper.truststore.location";
++  public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD =
++      "hive.cluster.delegation.token.store.zookeeper.truststore.password";
+ 
+   public MetastoreDelegationTokenManager() {
+   }
+diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
+index af52fcc5f6..dd2af7ef1f 100644
+--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
++++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
+@@ -29,8 +29,8 @@
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.api.ACLProvider;
+ import org.apache.curator.framework.imps.CuratorFrameworkState;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+ import org.apache.hadoop.security.UserGroupInformation;
+@@ -57,12 +57,22 @@
+   protected static final String ZK_SEQ_FORMAT = "%010d";
+   private static final String NODE_KEYS = "/keys";
+   private static final String NODE_TOKENS = "/tokens";
++  private static final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
++      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
+ 
+   private String rootNode = "";
+   private volatile CuratorFramework zkSession;
+   private String zkConnectString;
+   private int connectTimeoutMillis;
++  private boolean sslEnabled;
++  private String keyStoreLocation;
++  private String keyStorePassword;
++  private String trustStoreLocation;
++  private String trustStorePassword;
++
+   private List<ACL> newNodeAcl;
++  private Configuration conf;
++  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
+ 
+   /**
+    * ACLProvider permissions will be used in case parent dirs need to be created
+@@ -110,12 +120,7 @@ private boolean isKerberosEnabled(Configuration conf) {
+     }
+   }
+ 
+-  private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
+-      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
+-
+-  private Configuration conf;
+ 
+-  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
+ 
+   /**
+    * Default constructor for dynamic instantiation w/ Configurable
+@@ -124,14 +129,22 @@ private boolean isKerberosEnabled(Configuration conf) {
+   protected ZooKeeperTokenStore() {
+   }
+ 
+-  private CuratorFramework getSession() {
++  public CuratorFramework getSession() {
+     if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+       synchronized (this) {
+         if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+-          zkSession =
+-              CuratorFrameworkFactory.builder().connectString(zkConnectString)
+-                  .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
+-                  .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
++          ZooKeeperHiveHelper zkHelper = ZooKeeperHiveHelper.builder()
++              .quorum(zkConnectString)
++              .connectionTimeout(connectTimeoutMillis)
++              .maxRetries(3)
++              .baseSleepTime(1000)
++              .sslEnabled(sslEnabled)
++              .keyStoreLocation(keyStoreLocation)
++              .keyStorePassword(keyStorePassword)
++              .trustStoreLocation(trustStoreLocation)
++              .trustStorePassword(trustStorePassword)
++              .build();
++          zkSession = zkHelper.getNewZookeeperClient(aclDefaultProvider);
+           zkSession.start();
+         }
+       }
+@@ -478,10 +491,14 @@ public void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode sMo
+             + WHEN_ZK_DSTORE_MSG);
+       }
+     }
+-    connectTimeoutMillis =
+-        conf.getInt(
+-            MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+-            CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
++    connectTimeoutMillis = conf.getInt(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
++        CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
++
++    sslEnabled = conf.getBoolean(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, false);
++    keyStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION, "");
++    keyStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD, "");
++    trustStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION, "");
++    trustStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD, "");
+ 
+     String aclStr = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+     this.newNodeAcl = StringUtils.isNotBlank(aclStr)? parseACLs(aclStr) : getDefaultAcl(conf);
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d50912b..34df01e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2672,6 +2672,25 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.MILLISECONDS),
         "Initial amount of time (in milliseconds) to wait between retries\n" +
         "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
+    HIVE_ZOOKEEPER_SSL_ENABLE("hive.zookeeper.ssl.client.enable", false,
+        "Set client to use TLS when connecting to ZooKeeper.  An explicit value overrides any value set via the " +
+            "zookeeper.client.secure system property (note the different name).  Defaults to false if neither is set."),
+    HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION("hive.zookeeper.ssl.keystore.location", "",
+        "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
+            "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
+            "system property (note the camelCase)."),
+    HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("hive.zookeeper.ssl.keystore.password", "",
+        "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+            "Overrides any explicit value set via the zookeeper.ssl.keyStore.password " +
+             "system property (note the camelCase)."),
+    HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("hive.zookeeper.ssl.truststore.location", "",
+        "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
+            "Overrides any explicit value set via the zookeeper.ssl.trustStore.location" +
+            "system property (note the camelCase)."),
+    HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("hive.zookeeper.ssl.truststore.password", "",
+        "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+            "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
+             "system property (note the camelCase)."),
 
     // Transactions
     HIVE_TXN_MANAGER("hive.txn.manager",
@@ -4795,14 +4814,18 @@ public class HiveConf extends Configuration {
             "hive.spark.client.rpc.server.address," +
             "hive.spark.client.rpc.server.port," +
             "hive.spark.client.rpc.sasl.mechanisms," +
-            "bonecp.,"+
-            "hive.druid.broker.address.default,"+
-            "hive.druid.coordinator.address.default,"+
-            "hikaricp.,"+
-            "hadoop.bin.path,"+
-            "yarn.bin.path,"+
-            "spark.home,"+
-            "hive.driver.parallel.compilation.global.limit",
+            "bonecp.," +
+            "hive.druid.broker.address.default," +
+            "hive.druid.coordinator.address.default," +
+            "hikaricp.," +
+            "hadoop.bin.path," +
+            "yarn.bin.path," +
+            "spark.home," +
+            "hive.driver.parallel.compilation.global.limit," +
+            "hive.zookeeper.ssl.keystore.location," +
+            "hive.zookeeper.ssl.keystore.password," +
+            "hive.zookeeper.ssl.truststore.location," +
+            "hive.zookeeper.ssl.truststore.password",
         "Comma separated list of configuration options which are immutable at runtime"),
     HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
         METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname
@@ -4817,7 +4840,11 @@ public class HiveConf extends Configuration {
         + ",fs.s3a.proxy.password"
         + ",dfs.adls.oauth2.credential"
         + ",fs.adl.oauth2.credential"
-        + ",fs.azure.account.oauth2.client.secret",
+        + ",fs.azure.account.oauth2.client.secret"
+        + ",hive.zookeeper.ssl.keystore.location"
+        + ",hive.zookeeper.ssl.keystore.password"
+        + ",hive.zookeeper.ssl.truststore.location"
+        + ",hive.zookeeper.ssl.truststore.password",
         "Comma separated list of configuration options which should not be read by normal user like passwords"),
     HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
         "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
@@ -5619,14 +5646,22 @@ public class HiveConf extends Configuration {
    * given HiveConf.
    */
   public ZooKeeperHiveHelper getZKConfig() {
-    return new ZooKeeperHiveHelper(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM),
-            getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT),
-            getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE),
-            (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
-                    TimeUnit.MILLISECONDS),
-            (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
-                    TimeUnit.MILLISECONDS),
-            getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES));
+    return ZooKeeperHiveHelper.builder()
+      .quorum(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM))
+      .clientPort(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT))
+      .serverRegistryNameSpace(getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE))
+      .connectionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
+          TimeUnit.MILLISECONDS))
+      .sessionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+          TimeUnit.MILLISECONDS))
+      .baseSleepTime((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+          TimeUnit.MILLISECONDS))
+      .maxRetries(getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
+      .sslEnabled(getBoolVar(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
+      .keyStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
+      .keyStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
+      .trustStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
+      .trustStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
   }
 
   public HiveConf() {
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
index 1fc8d36..02a8926 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -50,8 +50,12 @@ public class ZooKeeperStorage implements TempletonStorage {
   public String overhead_path = null;
 
   public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
-  public static final String ZK_SESSION_TIMEOUT
-    = "templeton.zookeeper.session-timeout";
+  public static final String ZK_SESSION_TIMEOUT = "templeton.zookeeper.session-timeout";
+  public static final String ZK_SSL_ENABLE = "templeton.zookeeper.ssl.client.enable";
+  public static final String ZK_KEYSTORE_LOCATION = "templeton.zookeeper.keystore.location";
+  public static final String ZK_KEYSTORE_PASSWORD = "templeton.zookeeper.keystore.password";
+  public static final String ZK_TRUSTSTORE_LOCATION = "templeton.zookeeper.truststore.location";
+  public static final String ZK_TRUSTSTORE_PASSWORD = "templeton.zookeeper.truststore.password";
 
   public static final String ENCODING = "UTF-8";
 
@@ -59,27 +63,25 @@ public class ZooKeeperStorage implements TempletonStorage {
 
   private CuratorFramework zk;
 
-  /**
-   * Open a ZooKeeper connection for the JobState.
-   */
-  public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
-    throws IOException {
-    //do we need to add a connection status listener?  What will that do?
-    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
-    CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
-      CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
-    zk.start();
-    return zk;
-  }
 
   /**
    * Open a ZooKeeper connection for the JobState.
    */
   public static CuratorFramework zkOpen(Configuration conf) throws IOException {
-    /*the silly looking call to Builder below is to get the default value of session timeout
-    from Curator which itself exposes it as system property*/
-    return zkOpen(conf.get(ZK_HOSTS),
-      conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
+    ZooKeeperHiveHelper xkHelper = ZooKeeperHiveHelper.builder()
+        .quorum(conf.get(ZK_HOSTS))
+        .sessionTimeout(conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()))
+        .baseSleepTime(1000)
+        .maxRetries(3)
+        .sslEnabled(conf.getBoolean(ZK_SSL_ENABLE, false))
+        .keyStoreLocation(conf.get(ZK_KEYSTORE_LOCATION, ""))
+        .keyStorePassword(conf.get(ZK_KEYSTORE_PASSWORD, ""))
+        .trustStoreLocation(conf.get(ZK_TRUSTSTORE_LOCATION, ""))
+        .trustStorePassword(conf.get(ZK_TRUSTSTORE_PASSWORD, ""))
+        .build();
+    CuratorFramework zk = xkHelper.getNewZookeeperClient();
+    zk.start();
+    return zk;
   }
 
   public ZooKeeperStorage() {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
new file mode 100644
index 0000000..084e097
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import org.junit.BeforeClass;
+
+/**
+ * TestZookeeperTokenStore with zookeeper SSL communication disabled.
+ */
+public class TestZookeeperTokenStorePlain extends ZooKeeperTokenStoreTestBase {
+
+  public TestZookeeperTokenStorePlain(){
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    setUpInternal(false);
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
new file mode 100644
index 0000000..4017990
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import org.junit.BeforeClass;
+
+/**
+ * TestZookeeperTokenStore with zookeeper SSL communication enabled.
+ */
+public class TestZookeeperTokenStoreSSLEnabled extends ZooKeeperTokenStoreTestBase {
+
+  public TestZookeeperTokenStoreSSLEnabled(){
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    setUpInternal(true);
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
similarity index 76%
rename from itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
rename to itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
index 603155b..35053e7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
@@ -25,67 +25,84 @@ import java.util.List;
 
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
-import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
 import org.apache.hive.testutils.MiniZooKeeperCluster;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
+import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Test;
+
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
-import org.junit.Before;
-import org.junit.After;
-import org.junit.Test;
 
 /**
  * TestZooKeeperTokenStore.
  */
-public class TestZooKeeperTokenStore {
+public abstract class ZooKeeperTokenStoreTestBase {
+
+  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
+  private static final String TRUST_STORE_NAME = "truststore.jks";
+  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
 
-  private MiniZooKeeperCluster zkCluster = null;
-  private CuratorFramework zkClient = null;
-  private int zkPort = -1;
-  private ZooKeeperTokenStore ts;
+  private static MiniZooKeeperCluster zkCluster = null;
+  private static int zkPort = -1;
+  private static ZooKeeperTokenStore ts;
+  private static boolean zkSslEnabled;
 
-  @Before
-  public void setUp() throws Exception {
+  public static void setUpInternal(boolean sslEnabled) throws Exception{
     File zkDataDir = new File(System.getProperty("test.tmp.dir"));
-    if (this.zkCluster != null) {
+    if (zkCluster != null) {
       throw new IOException("Cluster already running");
     }
-    this.zkCluster = new MiniZooKeeperCluster();
-    this.zkPort = this.zkCluster.startup(zkDataDir);
-    this.zkClient =
-        CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort)
-            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
-    this.zkClient.start();
+    zkCluster = new MiniZooKeeperCluster(sslEnabled);
+    zkPort = zkCluster.startup(zkDataDir);
+    zkSslEnabled = sslEnabled;
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception{
+    zkCluster.shutdown();
+    zkCluster = null;
   }
 
   @After
-  public void tearDown() throws Exception {
-    this.zkClient.close();
+  public void closeTokenStore() throws Exception{
     if (ts != null) {
       ts.close();
     }
-    this.zkCluster.shutdown();
-    this.zkCluster = null;
   }
 
   private Configuration createConf(String zkPath) {
     Configuration conf = new Configuration();
     conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:"
-        + this.zkPort);
+        + zkPort);
     conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath);
+    if(zkSslEnabled) {
+      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
+          System.getProperty("test.data.files") :
+          (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION,
+          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD,
+          KEY_STORE_TRUST_STORE_PASSWORD);
+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION,
+          dataFileDir + File.separator + TRUST_STORE_NAME);
+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD,
+          KEY_STORE_TRUST_STORE_PASSWORD);
+      conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, "true");
+
+    }
     return conf;
   }
 
@@ -98,6 +115,7 @@ public class TestZooKeeperTokenStore {
     ts.setConf(conf);
     ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
 
+    CuratorFramework zkClient = ts.getSession();
 
     String metastore_zk_path = ZK_PATH + ServerMode.METASTORE;
     int keySeq = ts.addMasterKey("key1Data");
@@ -112,6 +130,7 @@ public class TestZooKeeperTokenStore {
 
     ts.removeMasterKey(keySeq);
     assertEquals("expected number keys", 1, ts.getMasterKeys().length);
+    ts.removeMasterKey(keySeq2);
 
     // tokens
     DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(
@@ -189,6 +208,8 @@ public class TestZooKeeperTokenStore {
     ts = new ZooKeeperTokenStore();
     ts.setConf(conf);
     ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
+
+    CuratorFramework zkClient = ts.getSession();
     List<ACL> acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE);
     assertEquals(2, acl.size());
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
index cc32a7e..596c3d6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
@@ -111,6 +111,10 @@ public class TestRestrictedList {
     addToExpectedRestrictedMap("spark.home");
     addToExpectedRestrictedMap("hive.privilege.synchronizer.interval");
     addToExpectedRestrictedMap("hive.driver.parallel.compilation.global.limit");
+    addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.location");
+    addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.password");
+    addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.location");
+    addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.password");
   }
 
   @AfterClass
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
index bd5e811..3322434 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
@@ -22,11 +22,10 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentNode;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
@@ -165,9 +164,8 @@ public class TestServiceDiscovery {
     // Publish configs for this instance as the data on the node
     znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confs);
     byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
-    PersistentEphemeralNode znode =
-      new PersistentEphemeralNode(client,
-        PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
+    PersistentNode znode = new PersistentNode(client, CreateMode.EPHEMERAL_SEQUENTIAL,
+        false, pathPrefix, znodeDataUTF8);
     znode.start();
     // We'll wait for 120s for node creation
     long znodeCreationTimeout = 120;
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
similarity index 95%
rename from itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
rename to itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
index de2e493..7302e09 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.hive.service.server;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -54,14 +55,14 @@ import org.apache.hive.service.cli.CLIServiceClient;
 import org.apache.hive.service.cli.OperationHandle;
 import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.SessionHandle;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
  * Test restricted information schema with privilege synchronization
  */
-public class TestInformationSchemaWithPrivilege {
+public abstract class InformationSchemaWithPrivilegeTestBase {
 
   // Group mapping:
   // group_a: user1, user2
@@ -175,14 +176,18 @@ public class TestInformationSchemaWithPrivilege {
     }
   }
 
+  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
+  private static final String TRUST_STORE_NAME = "truststore.jks";
+  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
+
   private static MiniHS2 miniHS2 = null;
   private static MiniZooKeeperCluster zkCluster = null;
   private static Map<String, String> confOverlay;
 
-  @BeforeClass
-  public static void beforeTest() throws Exception {
+
+  public static void setupInternal(boolean zookeeperSSLEnabled) throws Exception {
     File zkDataDir = new File(System.getProperty("test.tmp.dir"));
-    zkCluster = new MiniZooKeeperCluster();
+    zkCluster = new MiniZooKeeperCluster(zookeeperSSLEnabled);
     int zkPort = zkCluster.startup(zkDataDir);
 
     miniHS2 = new MiniHS2(new HiveConf());
@@ -206,9 +211,34 @@ public class TestInformationSchemaWithPrivilege {
     confOverlay.put(ConfVars.HIVE_AUTHENTICATOR_MANAGER.varname, FakeGroupAuthenticator.class.getName());
     confOverlay.put(ConfVars.HIVE_AUTHORIZATION_ENABLED.varname, "true");
     confOverlay.put(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST.varname, ".*");
+
+    if(zookeeperSSLEnabled) {
+      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
+          System.getProperty("test.data.files") :
+          (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION.varname,
+          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
+          KEY_STORE_TRUST_STORE_PASSWORD);
+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION.varname,
+          dataFileDir + File.separator + TRUST_STORE_NAME);
+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
+          KEY_STORE_TRUST_STORE_PASSWORD);
+      confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE.varname, "true");
+    }
     miniHS2.start(confOverlay);
   }
 
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (miniHS2 != null) {
+      miniHS2.stop();
+    }
+    if (zkCluster != null) {
+      zkCluster.shutdown();
+    }
+  }
+
   @Test
   public void test() throws Exception {
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
new file mode 100644
index 0000000..ffa1843
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import org.junit.BeforeClass;
+
+/**
+ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication disabled.
+ */
+public class TestInformationSchemaWithPrivilegeZookeeperPlain extends InformationSchemaWithPrivilegeTestBase {
+
+  public TestInformationSchemaWithPrivilegeZookeeperPlain() {
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception{
+    setupInternal(false);
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
new file mode 100644
index 0000000..e12f494
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import org.junit.BeforeClass;
+
+/**
+ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication enabled.
+ */
+public class TestInformationSchemaWithPrivilegeZookeeperSSL extends InformationSchemaWithPrivilegeTestBase {
+
+  public TestInformationSchemaWithPrivilegeZookeeperSSL() {
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception{
+    setupInternal(true);
+  }
+}
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index e23826e..6cb6853 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -116,6 +116,11 @@ public class Utils {
     public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
     public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA";
     public static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
+    public static final String ZOOKEEPER_SSL_ENABLE = "zooKeeperSSLEnable";
+    public static final String ZOOKEEPER_KEYSTORE_LOCATION = "zooKeeperKeystoreLocation";
+    public static final String ZOOKEEPER_KEYSTORE_PASSWORD= "zooKeeperKeystorePassword";
+    public static final String ZOOKEEPER_TRUSTSTORE_LOCATION  = "zooKeeperTruststoreLocation";
+    public static final String ZOOKEEPER_TRUSTSTORE_PASSWORD = "zooKeeperTruststorePassword";
     // Default namespace value on ZooKeeper.
     // This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
     static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
@@ -168,6 +173,11 @@ public class Utils {
     private boolean isEmbeddedMode = false;
     private String suppliedURLAuthority;
     private String zooKeeperEnsemble = null;
+    private boolean zooKeeperSslEnabled = false;
+    private String zookeeperKeyStoreLocation = "";
+    private String zookeeperKeyStorePassword = "";
+    private String zookeeperTrustStoreLocation = "";
+    private String zookeeperTrustStorePassword = "";
     private String currentHostZnodePath;
     private final List<String> rejectedHostZnodePaths = new ArrayList<String>();
 
@@ -185,6 +195,12 @@ public class Utils {
       this.isEmbeddedMode = params.isEmbeddedMode;
       this.suppliedURLAuthority = params.suppliedURLAuthority;
       this.zooKeeperEnsemble = params.zooKeeperEnsemble;
+      this.zooKeeperSslEnabled = params.zooKeeperSslEnabled;
+      this.zookeeperKeyStoreLocation = params.zookeeperKeyStoreLocation;
+      this.zookeeperKeyStorePassword = params.zookeeperKeyStorePassword;
+      this.zookeeperTrustStoreLocation = params.zookeeperTrustStoreLocation;
+      this.zookeeperTrustStorePassword = params.zookeeperTrustStorePassword;
+
       this.currentHostZnodePath = params.currentHostZnodePath;
       this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths);
     }
@@ -228,6 +244,25 @@ public class Utils {
     public String getZooKeeperEnsemble() {
       return zooKeeperEnsemble;
     }
+    public boolean isZooKeeperSslEnabled() {
+      return zooKeeperSslEnabled;
+    }
+
+    public String getZookeeperKeyStoreLocation() {
+      return zookeeperKeyStoreLocation;
+    }
+
+    public String getZookeeperKeyStorePassword() {
+      return zookeeperKeyStorePassword;
+    }
+
+    public String getZookeeperTrustStoreLocation() {
+      return zookeeperTrustStoreLocation;
+    }
+
+    public String getZookeeperTrustStorePassword() {
+      return zookeeperTrustStorePassword;
+    }
 
     public List<String> getRejectedHostZnodePaths() {
       return rejectedHostZnodePaths;
@@ -277,6 +312,26 @@ public class Utils {
       this.zooKeeperEnsemble = zooKeeperEnsemble;
     }
 
+    public void setZooKeeperSslEnabled(boolean zooKeeperSslEnabled) {
+      this.zooKeeperSslEnabled = zooKeeperSslEnabled;
+    }
+
+    public void setZookeeperKeyStoreLocation(String zookeeperKeyStoreLocation) {
+      this.zookeeperKeyStoreLocation = zookeeperKeyStoreLocation;
+    }
+
+    public void setZookeeperKeyStorePassword(String zookeeperKeyStorePassword) {
+      this.zookeeperKeyStorePassword = zookeeperKeyStorePassword;
+    }
+
+    public void setZookeeperTrustStoreLocation(String zookeeperTrustStoreLocation) {
+      this.zookeeperTrustStoreLocation = zookeeperTrustStoreLocation;
+    }
+
+    public void setZookeeperTrustStorePassword(String zookeeperTrustStorePassword) {
+      this.zookeeperTrustStorePassword = zookeeperTrustStorePassword;
+    }
+
     public void setCurrentHostZnodePath(String currentHostZnodePath) {
       this.currentHostZnodePath = currentHostZnodePath;
     }
@@ -485,6 +540,7 @@ public class Utils {
         uri = uri.replace(dummyAuthorityString, authorityStr);
         // Set ZooKeeper ensemble in connParams for later use
         connParams.setZooKeeperEnsemble(authorityStr);
+        ZooKeeperHiveClientHelper.setZkSSLParams(connParams);
       } else {
         URI jdbcBaseURI = URI.create(URI_HIVE_PREFIX + "//" + authorityStr);
         // Check to prevent unintentional use of embedded mode. A missing "/"
@@ -576,7 +632,6 @@ public class Utils {
    * host:port pairs.
    *
    * @param uri
-   * @param connParams
    * @return
    * @throws JdbcUriParseException
    */
diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
index 759ba8a..3d89fa2 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
@@ -32,6 +32,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.SSLZookeeperFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
 import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
@@ -85,11 +86,40 @@ class ZooKeeperHiveClientHelper {
       JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode));
   }
 
+  /**
+   * Parse and set up the SSL communication related Zookeeper params in connParams from sessionVars.
+   * @param connParams
+   */
+  public static void setZkSSLParams(JdbcConnectionParams connParams) {
+    Map<String, String> sessionConf = connParams.getSessionVars();
+    boolean sslEnabled = false;
+    if (sessionConf.containsKey(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE)) {
+      sslEnabled = Boolean.parseBoolean(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE));
+      connParams.setZooKeeperSslEnabled(sslEnabled);
+    }
+    if (sslEnabled) {
+      connParams.setZookeeperKeyStoreLocation(
+          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_LOCATION), ""));
+      connParams.setZookeeperKeyStorePassword(
+          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_PASSWORD), ""));
+      connParams.setZookeeperTrustStoreLocation(
+          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_LOCATION), ""));
+      connParams.setZookeeperTrustStorePassword(
+          StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_PASSWORD), ""));
+    }
+  }
+
   private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception {
     String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
     CuratorFramework zooKeeperClient =
-        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
-            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+        CuratorFrameworkFactory.builder()
+            .connectString(zooKeeperEnsemble)
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+            .zookeeperFactory(
+            new SSLZookeeperFactory(connParams.isZooKeeperSslEnabled(), connParams.getZookeeperKeyStoreLocation(),
+                connParams.getZookeeperKeyStorePassword(), connParams.getZookeeperTrustStoreLocation(),
+                connParams.getZookeeperTrustStorePassword()))
+            .build();
     zooKeeperClient.start();
     return zooKeeperClient;
   }
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index d28fd17..2b21baa 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -30,20 +30,18 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.framework.recipes.nodes.PersistentNode;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapUtil;
@@ -55,6 +53,7 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMars
 import org.apache.hadoop.registry.client.types.ServiceRecord;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.InvalidACLException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
@@ -111,7 +110,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private final Map<String, Set<InstanceType>> nodeToInstanceCache;
 
   // The registration znode.
-  private PersistentEphemeralNode znode;
+  private PersistentNode znode;
   private String znodePath; // unique identity for this instance
 
   private PathChildrenCache instancesCache; // Created on demand.
@@ -212,28 +211,23 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   }
 
   private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
-    String zkEnsemble = getQuorumServers(conf);
-    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
-      ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
-    int connectionTimeout = (int) HiveConf.getTimeVar(conf,
-      ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
-      ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
-    int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
-
-    LOG.info("Creating curator client with connectString: {} sessionTimeoutMs: {} connectionTimeoutMs: {}" +
-      " namespace: {} exponentialBackoff - sleepTime: {} maxRetries: {}", zkEnsemble, sessionTimeout,
-      connectionTimeout, namespace, baseSleepTime, maxRetries);
-    // Create a CuratorFramework instance to be used as the ZooKeeper client
-    // Use the zooKeeperAclProvider to create appropriate ACLs
-    return CuratorFrameworkFactory.builder()
-      .connectString(zkEnsemble)
-      .sessionTimeoutMs(sessionTimeout)
-      .connectionTimeoutMs(connectionTimeout)
-      .aclProvider(zooKeeperAclProvider)
-      .namespace(namespace)
-      .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
-      .build();
+    return ZooKeeperHiveHelper.builder()
+        .quorum(conf.get(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname))
+        .clientPort(conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
+            ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()))
+        .connectionTimeout(
+            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
+        .sessionTimeout(
+            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS))
+        .baseSleepTime(
+            (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS))
+        .maxRetries(HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
+        .sslEnabled(HiveConf.getBoolVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
+        .keyStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
+        .keyStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
+        .trustStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
+        .trustStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD))
+        .build().getNewZookeeperClient(zooKeeperAclProvider, namespace);
   }
 
   private static List<ACL> createSecureAcls() {
@@ -283,9 +277,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
     // Create a znode under the rootNamespace parent for this instance of the server
     try {
-      // PersistentEphemeralNode will make sure the ephemeral node created on server will be present
+      // PersistentNode will make sure the ephemeral node created on server will be present
       // even under connection or session interruption (will automatically handle retries)
-      znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
+      znode = new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false,
           workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
     
       // start the creation of znodes
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
index fa3a382..e8eaac0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
@@ -18,14 +18,11 @@
 
 package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
 
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.hive.conf.HiveConf;
 
 public class CuratorFrameworkSingleton {
@@ -50,15 +47,8 @@ public class CuratorFrameworkSingleton {
       } else {
         conf = hiveConf;
       }
-      int sessionTimeout =  (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
-      int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
-      int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
-      String quorumServers = conf.getZKConfig().getQuorumServers();
 
-      sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers)
-          .sessionTimeoutMs(sessionTimeout)
-          .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
-          .build();
+      sharedClient = conf.getZKConfig().getNewZookeeperClient();
       sharedClient.start();
     }
 
diff --git a/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java b/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
index eec6282..dc11ae1 100644
--- a/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
+++ b/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
@@ -34,7 +34,9 @@ import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.X509Util;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.slf4j.Logger;
@@ -54,6 +56,9 @@ public class MiniZooKeeperCluster {
 
   private static final int TICK_TIME = 2000;
   private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
+  private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
+  private static final String TRUST_STORE_NAME = "truststore.jks";
+  private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
   private int connectionTimeout;
 
   private boolean started;
@@ -61,7 +66,7 @@ public class MiniZooKeeperCluster {
   /** The default port. If zero, we use a random port. */
   private int defaultClientPort = 0;
 
-  private List<NIOServerCnxnFactory> standaloneServerFactoryList;
+  private List<ServerCnxnFactory> standaloneServerFactoryList;
   private List<ZooKeeperServer> zooKeeperServers;
   private List<Integer> clientPortList;
 
@@ -70,11 +75,20 @@ public class MiniZooKeeperCluster {
 
   private Configuration configuration;
 
+  private boolean sslEnabled = false;
+
   public MiniZooKeeperCluster() {
     this(new Configuration());
   }
-
+  public MiniZooKeeperCluster(boolean sslEnabled) {
+    this(new Configuration(), sslEnabled);
+  }
   public MiniZooKeeperCluster(Configuration configuration) {
+    this(configuration, false);
+  }
+
+  public MiniZooKeeperCluster(Configuration configuration, boolean sslEnabled) {
+    this.sslEnabled = sslEnabled;
     this.started = false;
     this.configuration = configuration;
     activeZKServerIndex = -1;
@@ -167,7 +181,7 @@ public class MiniZooKeeperCluster {
   }
 
   // / XXX: From o.a.zk.t.ClientBase
-  private static void setupTestEnv() {
+  private static void setupTestEnv(boolean sslEnabled) {
     // With ZooKeeper 3.5 we need to whitelist the 4 letter commands we use
     System.setProperty("zookeeper.4lw.commands.whitelist", "*");
 
@@ -176,6 +190,9 @@ public class MiniZooKeeperCluster {
     // resulting in test failure (client timeout on first session).
     // set env and directly in order to handle static init/gc issues
     System.setProperty("zookeeper.preAllocSize", "100");
+    if (sslEnabled) {
+      System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
+    }
     FileTxnLog.setPreallocSize(100 * 1024);
   }
 
@@ -200,7 +217,7 @@ public class MiniZooKeeperCluster {
       return -1;
     }
 
-    setupTestEnv();
+    setupTestEnv(sslEnabled);
     shutdown();
 
     int tentativePort = -1; // the seed port
@@ -229,12 +246,10 @@ public class MiniZooKeeperCluster {
       // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
       server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
       server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
-      NIOServerCnxnFactory standaloneServerFactory;
+      ServerCnxnFactory standaloneServerFactory;
       while (true) {
         try {
-          standaloneServerFactory = new NIOServerCnxnFactory();
-          standaloneServerFactory.configure(new InetSocketAddress(currentClientPort),
-              configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
+          standaloneServerFactory = createServerCnxnFactory(currentClientPort);
         } catch (BindException e) {
           LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
           // We're told to use some port but it's occupied, fail
@@ -252,7 +267,7 @@ public class MiniZooKeeperCluster {
       // Start up this ZK server
       standaloneServerFactory.startup(server);
       // Runs a 'stat' against the servers.
-      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
+      if (!sslEnabled && !waitForServerUp(currentClientPort, connectionTimeout)) {
         throw new IOException("Waiting for startup of standalone server");
       }
 
@@ -276,6 +291,36 @@ public class MiniZooKeeperCluster {
     return clientPort;
   }
 
+  private ServerCnxnFactory createServerCnxnFactory(int currentClientPort) throws IOException {
+    ServerCnxnFactory serverCnxnFactory = null;
+    if (sslEnabled) {
+      System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
+          "org.apache.zookeeper.server.NettyServerCnxnFactory");
+      String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
+              System.getProperty("test.data.files") :
+              configuration.get("test.data.files").replace('\\', '/').replace("c:", "");
+      X509Util x509Util = new ClientX509Util();
+      System.setProperty(x509Util.getSslKeystoreLocationProperty(),
+          dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
+      System.setProperty(x509Util.getSslKeystorePasswdProperty(),
+          KEY_STORE_TRUST_STORE_PASSWORD);
+      System.setProperty(x509Util.getSslTruststoreLocationProperty(),
+          dataFileDir + File.separator + TRUST_STORE_NAME);
+      System.setProperty(x509Util.getSslTruststorePasswdProperty(),
+          KEY_STORE_TRUST_STORE_PASSWORD);
+      serverCnxnFactory = ServerCnxnFactory.createFactory();
+      serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
+          configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS),
+          true);
+    } else {
+      serverCnxnFactory = ServerCnxnFactory.createFactory();
+      serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
+          configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
+    }
+    return serverCnxnFactory;
+  }
+
+
   private void createDir(File dir) throws IOException {
     try {
       if (!dir.exists()) {
@@ -292,7 +337,7 @@ public class MiniZooKeeperCluster {
   public void shutdown() throws IOException {
     // shut down all the zk servers
     for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
-      NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
+      ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
       int clientPort = clientPortList.get(i);
 
       standaloneServerFactory.shutdown();
@@ -305,6 +350,7 @@ public class MiniZooKeeperCluster {
     for (ZooKeeperServer zkServer : zooKeeperServers) {
       //explicitly close ZKDatabase since ZookeeperServer does not close them
       zkServer.getZKDatabase().close();
+      zkServer.shutdown(true);
     }
     zooKeeperServers.clear();
 
@@ -328,7 +374,7 @@ public class MiniZooKeeperCluster {
     }
 
     // Shutdown the current active one
-    NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
+    ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
     int clientPort = clientPortList.get(activeZKServerIndex);
 
     standaloneServerFactory.shutdown();
@@ -366,7 +412,7 @@ public class MiniZooKeeperCluster {
 
     int backupZKServerIndex = activeZKServerIndex + 1;
     // Shutdown the current active one
-    NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
+    ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
     int clientPort = clientPortList.get(backupZKServerIndex);
 
     standaloneServerFactory.shutdown();
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index fece82e..181ea5d 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -43,14 +43,12 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.JvmPauseMonitor;
 import org.apache.hadoop.hive.common.LogUtils;
@@ -1078,14 +1076,9 @@ public class HiveServer2 extends CompositeService {
    */
   static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
     HiveConf hiveConf = new HiveConf();
-    String zooKeeperEnsemble = hiveConf.getZKConfig().getQuorumServers();
-    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
-    int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
-    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
-    CuratorFramework zooKeeperClient =
-        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
-            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
+    CuratorFramework zooKeeperClient = hiveConf.getZKConfig().getNewZookeeperClient();
     zooKeeperClient.start();
+    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
     List<String> znodePaths =
         zooKeeperClient.getChildren().forPath(
             ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
new file mode 100644
index 0000000..ee01731
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory to create Zookeeper clients with the zookeeper.client.secure enabled,
+ * allowing SSL communication with the Zookeeper server.
+ */
+public class SSLZookeeperFactory implements ZookeeperFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SSLZookeeperFactory.class);
+
+  private boolean sslEnabled;
+  private String keyStoreLocation;
+  private String keyStorePassword;
+  private String trustStoreLocation;
+  private String trustStorePassword;
+
+  public SSLZookeeperFactory(boolean sslEnabled, String keyStoreLocation, String keyStorePassword,
+      String trustStoreLocation, String trustStorePassword) {
+
+    this.sslEnabled = sslEnabled;
+    this.keyStoreLocation = keyStoreLocation;
+    this.keyStorePassword = keyStorePassword;
+    this.trustStoreLocation = trustStoreLocation;
+    this.trustStorePassword = trustStorePassword;
+    if (sslEnabled) {
+      if (StringUtils.isEmpty(keyStoreLocation)) {
+        LOG.warn("Missing keystoreLocation parameter");
+      }
+      if (StringUtils.isEmpty(trustStoreLocation)) {
+        LOG.warn("Missing trustStoreLocation parameter");
+      }
+    }
+  }
+
+  @Override
+  public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
+      boolean canBeReadOnly) throws Exception {
+    if (!this.sslEnabled) {
+      return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+    }
+    ZKClientConfig clientConfig = new ZKClientConfig();
+    clientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+    clientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
+    ClientX509Util x509Util = new ClientX509Util();
+    clientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(), this.keyStoreLocation);
+    clientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(), this.keyStorePassword);
+    clientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(), this.trustStoreLocation);
+    clientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(), this.trustStorePassword);
+    return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, clientConfig);
+  }
+}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
index 99f7c97..71d8651 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
@@ -23,10 +23,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.List;
+
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentNode;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -55,28 +56,164 @@ public class ZooKeeperHiveHelper {
   public static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHiveHelper.class.getName());
   public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
 
-  private String quorum = null;
-  private String clientPort = null;
-  private String rootNamespace = null;
-  private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
+  /**
+   * ZooKeeperHiveHelperBuilder. A builder class to initialize ZooKeeperHiveHelper.
+   */
+  public static class ZooKeeperHiveHelperBuilder {
+    private String quorum = null;
+    private String clientPort = null;
+    private String serverRegistryNameSpace = null;
+    private int connectionTimeout;
+    private int sessionTimeout;
+    private int baseSleepTime;
+    private int maxRetries;
+    private boolean sslEnabled = false;
+    private String keyStoreLocation = null;
+    private String keyStorePassword = null;
+    private String trustStoreLocation = null;
+    private String trustStorePassword = null;
+
+    public ZooKeeperHiveHelper build() {
+      return new ZooKeeperHiveHelper(this);
+    }
+
+    public ZooKeeperHiveHelperBuilder quorum(String quorum) {
+      this.quorum = quorum;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder clientPort(String clientPort) {
+      this.clientPort = clientPort;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder serverRegistryNameSpace(String serverRegistryNameSpace) {
+      this.serverRegistryNameSpace = serverRegistryNameSpace;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder connectionTimeout(int connectionTimeout) {
+      this.connectionTimeout = connectionTimeout;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder sessionTimeout(int sessionTimeout) {
+      this.sessionTimeout = sessionTimeout;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder baseSleepTime(int baseSleepTime) {
+      this.baseSleepTime = baseSleepTime;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder maxRetries(int maxRetries) {
+      this.maxRetries = maxRetries;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder sslEnabled(boolean sslEnabled) {
+      this.sslEnabled = sslEnabled;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder keyStoreLocation(String keyStoreLocation) {
+      this.keyStoreLocation = keyStoreLocation;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder keyStorePassword(String keyStorePassword) {
+      this.keyStorePassword = keyStorePassword;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder trustStoreLocation(String trustStoreLocation) {
+      this.trustStoreLocation = trustStoreLocation;
+      return this;
+    }
+
+    public ZooKeeperHiveHelperBuilder trustStorePassword(String trustStorePassword) {
+      this.trustStorePassword = trustStorePassword;
+      return this;
+    }
+
+    public String getQuorum() {
+      return quorum;
+    }
+
+    public String getClientPort() {
+      return clientPort;
+    }
+
+    public String getServerRegistryNameSpace() {
+      return serverRegistryNameSpace;
+    }
+
+    public int getConnectionTimeout() {
+      return connectionTimeout;
+    }
+
+    public int getSessionTimeout() {
+      return sessionTimeout;
+    }
+
+    public int getBaseSleepTime() {
+      return baseSleepTime;
+    }
+
+    public int getMaxRetries() {
+      return maxRetries;
+    }
+
+    public boolean isSslEnabled() {
+      return sslEnabled;
+    }
+
+    public String getKeyStoreLocation() {
+      return keyStoreLocation;
+    }
+
+    public String getKeyStorePassword() {
+      return keyStorePassword;
+    }
+
+    public String getTrustStoreLocation() {
+      return trustStoreLocation;
+    }
+
+    public String getTrustStorePassword() {
+      return trustStorePassword;
+    }
+  }
+
+  public static ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder builder() {
+    return new ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder();
+  }
+
+  private String quorum;
+  private String rootNamespace;
+  private int connectionTimeout;
   private int sessionTimeout;
   private int baseSleepTime;
   private int maxRetries;
+  private boolean sslEnabled;
 
+  private SSLZookeeperFactory sslZookeeperFactory;
   private CuratorFramework zooKeeperClient;
-  private PersistentEphemeralNode znode;
+  private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
+  private PersistentNode znode;
+
 
-  public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespace,
-                             int sessionTimeout, int baseSleepTime, int maxRetries) {
+  public ZooKeeperHiveHelper(ZooKeeperHiveHelperBuilder builder) {
     // Get the ensemble server addresses in the format host1:port1, host2:port2, ... . Append
     // the configured port to hostname if the hostname doesn't contain a port.
-    String[] hosts = quorum.split(",");
+    String[] hosts = builder.getQuorum().split(",");
     StringBuilder quorumServers = new StringBuilder();
     for (int i = 0; i < hosts.length; i++) {
       quorumServers.append(hosts[i].trim());
       if (!hosts[i].contains(":")) {
         quorumServers.append(":");
-        quorumServers.append(clientPort);
+        quorumServers.append(builder.getClientPort());
       }
 
       if (i != hosts.length - 1) {
@@ -85,11 +222,19 @@ public class ZooKeeperHiveHelper {
     }
 
     this.quorum = quorumServers.toString();
-    this.clientPort = clientPort;
-    this.rootNamespace = rootNamespace;
-    this.sessionTimeout = sessionTimeout;
-    this.baseSleepTime = baseSleepTime;
-    this.maxRetries = maxRetries;
+    this.rootNamespace = builder.getServerRegistryNameSpace();
+    this.connectionTimeout = builder.getConnectionTimeout();
+    this.sessionTimeout = builder.getSessionTimeout();
+    this.baseSleepTime = builder.getBaseSleepTime();
+    this.maxRetries = builder.getMaxRetries();
+    this.sslEnabled = builder.isSslEnabled();
+    this.sslZookeeperFactory =
+        new SSLZookeeperFactory(sslEnabled,
+            builder.getKeyStoreLocation(),
+            builder.getKeyStorePassword(),
+            builder.getTrustStoreLocation(),
+            builder.getTrustStorePassword());
+
   }
 
   /**
@@ -118,8 +263,7 @@ public class ZooKeeperHiveHelper {
                       + ZOOKEEPER_PATH_SEPARATOR + znodePathPrefix;
       byte[] znodeDataUTF8 = znodeData.getBytes(StandardCharsets.UTF_8);
       znode =
-              new PersistentEphemeralNode(zooKeeperClient,
-                      PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
+              new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false, pathPrefix, znodeDataUTF8);
       znode.start();
       // We'll wait for 120s for node creation
       long znodeCreationTimeout = 120;
@@ -147,17 +291,7 @@ public class ZooKeeperHiveHelper {
 
   public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
                                                boolean addParentNode) throws Exception {
-    String zooKeeperEnsemble = getQuorumServers();
-    // Create a CuratorFramework instance to be used as the ZooKeeper client.
-    // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
-    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-            .connectString(zooKeeperEnsemble)
-            .sessionTimeoutMs(sessionTimeout)
-            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
-    if (zooKeeperAclProvider != null) {
-      builder = builder.aclProvider(zooKeeperAclProvider);
-    }
-    CuratorFramework zkClient = builder.build();
+    CuratorFramework zkClient = getNewZookeeperClient(zooKeeperAclProvider);
     zkClient.start();
 
     // Create the parent znodes recursively; ignore if the parent already exists.
@@ -177,6 +311,39 @@ public class ZooKeeperHiveHelper {
     }
     return zkClient;
   }
+  public CuratorFramework getNewZookeeperClient() {
+    return getNewZookeeperClient(null, null);
+  }
+  public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider) {
+    return getNewZookeeperClient(zooKeeperAclProvider, null);
+  }
+
+  public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider, String nameSpace) {
+    LOG.info("Creating curator client with connectString: {} namespace: {} sessionTimeoutMs: {}" +
+            " connectionTimeoutMs: {} exponentialBackoff - sleepTime: {} maxRetries: {} sslEnabled: {}",
+        quorum, nameSpace,  sessionTimeout,
+        connectionTimeout, baseSleepTime, maxRetries, sslEnabled);
+    // Create a CuratorFramework instance to be used as the ZooKeeper client.
+    // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
+    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+            .connectString(quorum)
+            .namespace(nameSpace)
+            .zookeeperFactory(this.sslZookeeperFactory);
+    if (connectionTimeout > 0) {
+      builder = builder.connectionTimeoutMs(connectionTimeout);
+    }
+    if (sessionTimeout > 0) {
+      builder = builder.sessionTimeoutMs(sessionTimeout);
+    }
+    if (maxRetries > 0) {
+      builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
+    }
+    if (zooKeeperAclProvider != null) {
+      builder = builder.aclProvider(zooKeeperAclProvider);
+    }
+
+    return builder.build();
+  }
 
   public void removeServerInstanceFromZooKeeper() throws Exception {
     setDeregisteredWithZooKeeper(true);
@@ -185,7 +352,9 @@ public class ZooKeeperHiveHelper {
       znode.close();
       znode = null;
     }
-    zooKeeperClient.close();
+    if (zooKeeperClient != null) {
+      zooKeeperClient.close();
+    }
     LOG.info("Server instance removed from ZooKeeper.");
   }
 
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index fc6a2fd..d6a6c96 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -258,7 +258,11 @@ public class MetastoreConf {
       ConfVars.SSL_TRUSTSTORE_PASSWORD.varname,
       ConfVars.SSL_TRUSTSTORE_PASSWORD.hiveName,
       ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.varname,
-      ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName
+      ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName,
+      ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
+      ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.hiveName,
+      ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
+      ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.hiveName
   );
 
   public static ConfVars getMetaConf(String name) {
@@ -1072,32 +1076,56 @@ public class MetastoreConf {
             "If ZooKeeper is configured for Kerberos authentication. This could be useful when cluster\n" +
             "is kerberized, but Zookeeper is not."),
     THRIFT_ZOOKEEPER_CLIENT_PORT("metastore.zookeeper.client.port",
-            "hive.metastore.zookeeper.client.port", "2181",
+            "hive.zookeeper.client.port", "2181",
             "The port of ZooKeeper servers to talk to.\n" +
                     "If the list of Zookeeper servers specified in hive.metastore.thrift.uris" +
                     " does not contain port numbers, this value is used."),
     THRIFT_ZOOKEEPER_SESSION_TIMEOUT("metastore.zookeeper.session.timeout",
-            "hive.metastore.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
+            "hive.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
             new TimeValidator(TimeUnit.MILLISECONDS),
             "ZooKeeper client's session timeout (in milliseconds). The client is disconnected\n" +
                     "if a heartbeat is not sent in the timeout."),
     THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT("metastore.zookeeper.connection.timeout",
-            "hive.metastore.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
+            "hive.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
             new TimeValidator(TimeUnit.SECONDS),
             "ZooKeeper client's connection timeout in seconds. " +
                     "Connection timeout * hive.metastore.zookeeper.connection.max.retries\n" +
                     "with exponential backoff is when curator client deems connection is lost to zookeeper."),
     THRIFT_ZOOKEEPER_NAMESPACE("metastore.zookeeper.namespace",
-            "hive.metastore.zookeeper.namespace", "hive_metastore",
+            "hive.zookeeper.namespace", "hive_metastore",
             "The parent node under which all ZooKeeper nodes for metastores are created."),
     THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES("metastore.zookeeper.connection.max.retries",
-            "hive.metastore.zookeeper.connection.max.retries", 3,
+            "hive.zookeeper.connection.max.retries", 3,
             "Max number of times to retry when connecting to the ZooKeeper server."),
     THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME("metastore.zookeeper.connection.basesleeptime",
-            "hive.metastore.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
+            "hive.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
             new TimeValidator(TimeUnit.MILLISECONDS),
             "Initial amount of time (in milliseconds) to wait between retries\n" +
                     "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
+    THRIFT_ZOOKEEPER_SSL_ENABLE("metastore.zookeeper.ssl.client.enable",
+        "hive.zookeeper.ssl.client.enable", false,
+        "Set client to use TLS when connecting to ZooKeeper.  An explicit value overrides any value set via the " +
+            "zookeeper.client.secure system property (note the different name).  Defaults to false if neither is set."),
+    THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION("metastore.zookeeper.ssl.keystore.location",
+        "hive.zookeeper.ssl.keystore.location", "",
+        "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
+            "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
+            "system property (note the camelCase)."),
+    THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("metastore.zookeeper.ssl.keystore.password",
+        "hive.zookeeper.ssl.keystore.password", "",
+        "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+            "Overrides any explicit value set via the zookeeper.ssl.keyStore.password" +
+            "system property (note the camelCase)."),
+    THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("metastore.zookeeper.ssl.truststore.location",
+        "hive.zookeeper.ssl.truststore.location", "",
+        "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
+            "Overrides any explicit value set via the zookeeper.ssl.trustStore.location " +
+            "system property (note the camelCase)."),
+    THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("metastore.zookeeper.ssl.truststore.password",
+        "hive.zookeeper.ssl.truststore.password", "",
+        "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+            "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
+            "system property (note the camelCase)."),
     THRIFT_URI_SELECTION("metastore.thrift.uri.selection", "hive.metastore.uri.selection", "RANDOM",
         new StringSetValidator("RANDOM", "SEQUENTIAL"),
         "Determines the selection mechanism used by metastore client to connect to remote " +
@@ -2018,14 +2046,22 @@ public class MetastoreConf {
   }
 
   public static ZooKeeperHiveHelper getZKConfig(Configuration conf) {
-    return new ZooKeeperHiveHelper(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS),
-            MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT),
-            MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE),
-            (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
-                    TimeUnit.MILLISECONDS),
-            (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
-                    TimeUnit.MILLISECONDS),
-            MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES));
+    return ZooKeeperHiveHelper.builder()
+        .quorum(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS))
+        .clientPort(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT))
+        .serverRegistryNameSpace(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE))
+        .connectionTimeout((int) getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT,
+            TimeUnit.MILLISECONDS))
+        .sessionTimeout((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
+            TimeUnit.MILLISECONDS))
+        .baseSleepTime((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+            TimeUnit.MILLISECONDS))
+        .maxRetries(MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES))
+        .sslEnabled(MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_ENABLE))
+        .keyStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
+        .keyStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
+        .trustStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
+        .trustStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
   }
 
   /**
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
index b35dc7c..239bff6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
@@ -46,6 +46,16 @@ public class MetastoreDelegationTokenManager {
   public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
       "hive.cluster.delegation.token.store.zookeeper.acl";
   public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hivedelegation";
+  public static final String DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE =
+      "hive.cluster.delegation.token.store.zookeeper.ssl.client.enable";
+  public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION =
+      "hive.cluster.delegation.token.store.zookeeper.keystore.location";
+  public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD =
+      "hive.cluster.delegation.token.store.zookeeper.keystore.password";
+  public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION =
+      "hive.cluster.delegation.token.store.zookeeper.truststore.location";
+  public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD =
+      "hive.cluster.delegation.token.store.zookeeper.truststore.password";
 
   public MetastoreDelegationTokenManager() {
   }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
index af52fcc..dd2af7e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
@@ -29,8 +29,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -57,12 +57,22 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
   protected static final String ZK_SEQ_FORMAT = "%010d";
   private static final String NODE_KEYS = "/keys";
   private static final String NODE_TOKENS = "/tokens";
+  private static final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
+      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
 
   private String rootNode = "";
   private volatile CuratorFramework zkSession;
   private String zkConnectString;
   private int connectTimeoutMillis;
+  private boolean sslEnabled;
+  private String keyStoreLocation;
+  private String keyStorePassword;
+  private String trustStoreLocation;
+  private String trustStorePassword;
+
   private List<ACL> newNodeAcl;
+  private Configuration conf;
+  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
 
   /**
    * ACLProvider permissions will be used in case parent dirs need to be created
@@ -110,12 +120,7 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
     }
   }
 
-  private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
-      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
-
-  private Configuration conf;
 
-  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
 
   /**
    * Default constructor for dynamic instantiation w/ Configurable
@@ -124,14 +129,22 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
   protected ZooKeeperTokenStore() {
   }
 
-  private CuratorFramework getSession() {
+  public CuratorFramework getSession() {
     if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
       synchronized (this) {
         if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
-          zkSession =
-              CuratorFrameworkFactory.builder().connectString(zkConnectString)
-                  .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
-                  .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+          ZooKeeperHiveHelper zkHelper = ZooKeeperHiveHelper.builder()
+              .quorum(zkConnectString)
+              .connectionTimeout(connectTimeoutMillis)
+              .maxRetries(3)
+              .baseSleepTime(1000)
+              .sslEnabled(sslEnabled)
+              .keyStoreLocation(keyStoreLocation)
+              .keyStorePassword(keyStorePassword)
+              .trustStoreLocation(trustStoreLocation)
+              .trustStorePassword(trustStorePassword)
+              .build();
+          zkSession = zkHelper.getNewZookeeperClient(aclDefaultProvider);
           zkSession.start();
         }
       }
@@ -478,10 +491,14 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
             + WHEN_ZK_DSTORE_MSG);
       }
     }
-    connectTimeoutMillis =
-        conf.getInt(
-            MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
-            CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
+    connectTimeoutMillis = conf.getInt(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+        CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
+
+    sslEnabled = conf.getBoolean(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, false);
+    keyStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION, "");
+    keyStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD, "");
+    trustStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION, "");
+    trustStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD, "");
 
     String aclStr = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null);
     this.newNodeAcl = StringUtils.isNotBlank(aclStr)? parseACLs(aclStr) : getDefaultAcl(conf);