You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/03/26 08:25:06 UTC
[hive] branch master updated: HIVE-23045: Zookeeper SSL/TLS support
(Peter Varga via Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new cb4427e HIVE-23045: Zookeeper SSL/TLS support (Peter Varga via Denys Kuzmenko)
cb4427e is described below
commit cb4427e1eaaf27029a397acadc62b8dddcb0437e
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Thu Mar 26 09:22:53 2020 +0100
HIVE-23045: Zookeeper SSL/TLS support (Peter Varga via Denys Kuzmenko)
---
HIVE-23045.10.patch | 1742 ++++++++++++++++++++
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 69 +-
.../hcatalog/templeton/tool/ZooKeeperStorage.java | 40 +-
.../security/TestZookeeperTokenStorePlain.java | 35 +
.../TestZookeeperTokenStoreSSLEnabled.java | 35 +
...Store.java => ZooKeeperTokenStoreTestBase.java} | 71 +-
.../org/apache/hive/jdbc/TestRestrictedList.java | 4 +
.../org/apache/hive/jdbc/TestServiceDiscovery.java | 8 +-
...=> InformationSchemaWithPrivilegeTestBase.java} | 40 +-
...formationSchemaWithPrivilegeZookeeperPlain.java | 35 +
...InformationSchemaWithPrivilegeZookeeperSSL.java | 35 +
jdbc/src/java/org/apache/hive/jdbc/Utils.java | 57 +-
.../hive/jdbc/ZooKeeperHiveClientHelper.java | 34 +-
.../hadoop/hive/registry/impl/ZkRegistryBase.java | 52 +-
.../zookeeper/CuratorFrameworkSingleton.java | 12 +-
.../hive/testutils/MiniZooKeeperCluster.java | 72 +-
.../apache/hive/service/server/HiveServer2.java | 11 +-
.../hadoop/hive/common/SSLZookeeperFactory.java | 78 +
.../hadoop/hive/common/ZooKeeperHiveHelper.java | 227 ++-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 66 +-
.../security/MetastoreDelegationTokenManager.java | 10 +
.../metastore/security/ZooKeeperTokenStore.java | 47 +-
22 files changed, 2585 insertions(+), 195 deletions(-)
diff --git a/HIVE-23045.10.patch b/HIVE-23045.10.patch
new file mode 100644
index 0000000..ce2fd49
--- /dev/null
+++ b/HIVE-23045.10.patch
@@ -0,0 +1,1742 @@
+diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+index d50912b4e2..34df01e60e 100644
+--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
++++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+@@ -2672,6 +2672,25 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Initial amount of time (in milliseconds) to wait between retries\n" +
+ "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
++ HIVE_ZOOKEEPER_SSL_ENABLE("hive.zookeeper.ssl.client.enable", false,
++ "Set client to use TLS when connecting to ZooKeeper. An explicit value overrides any value set via the " +
++ "zookeeper.client.secure system property (note the different name). Defaults to false if neither is set."),
++ HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION("hive.zookeeper.ssl.keystore.location", "",
++ "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
++ "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
++ "system property (note the camelCase)."),
++ HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("hive.zookeeper.ssl.keystore.password", "",
++ "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
++ "Overrides any explicit value set via the zookeeper.ssl.keyStore.password " +
++ "system property (note the camelCase)."),
++ HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("hive.zookeeper.ssl.truststore.location", "",
++ "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
++ "Overrides any explicit value set via the zookeeper.ssl.trustStore.location" +
++ "system property (note the camelCase)."),
++ HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("hive.zookeeper.ssl.truststore.password", "",
++ "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
++ "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
++ "system property (note the camelCase)."),
+
+ // Transactions
+ HIVE_TXN_MANAGER("hive.txn.manager",
+@@ -4795,14 +4814,18 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
+ "hive.spark.client.rpc.server.address," +
+ "hive.spark.client.rpc.server.port," +
+ "hive.spark.client.rpc.sasl.mechanisms," +
+- "bonecp.,"+
+- "hive.druid.broker.address.default,"+
+- "hive.druid.coordinator.address.default,"+
+- "hikaricp.,"+
+- "hadoop.bin.path,"+
+- "yarn.bin.path,"+
+- "spark.home,"+
+- "hive.driver.parallel.compilation.global.limit",
++ "bonecp.," +
++ "hive.druid.broker.address.default," +
++ "hive.druid.coordinator.address.default," +
++ "hikaricp.," +
++ "hadoop.bin.path," +
++ "yarn.bin.path," +
++ "spark.home," +
++ "hive.driver.parallel.compilation.global.limit," +
++ "hive.zookeeper.ssl.keystore.location," +
++ "hive.zookeeper.ssl.keystore.password," +
++ "hive.zookeeper.ssl.truststore.location," +
++ "hive.zookeeper.ssl.truststore.password",
+ "Comma separated list of configuration options which are immutable at runtime"),
+ HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
+ METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname
+@@ -4817,7 +4840,11 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
+ + ",fs.s3a.proxy.password"
+ + ",dfs.adls.oauth2.credential"
+ + ",fs.adl.oauth2.credential"
+- + ",fs.azure.account.oauth2.client.secret",
++ + ",fs.azure.account.oauth2.client.secret"
++ + ",hive.zookeeper.ssl.keystore.location"
++ + ",hive.zookeeper.ssl.keystore.password"
++ + ",hive.zookeeper.ssl.truststore.location"
++ + ",hive.zookeeper.ssl.truststore.password",
+ "Comma separated list of configuration options which should not be read by normal user like passwords"),
+ HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
+ "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
+@@ -5619,14 +5646,22 @@ public void logVars(PrintStream ps) {
+ * given HiveConf.
+ */
+ public ZooKeeperHiveHelper getZKConfig() {
+- return new ZooKeeperHiveHelper(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM),
+- getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT),
+- getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE),
+- (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+- TimeUnit.MILLISECONDS),
+- (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+- TimeUnit.MILLISECONDS),
+- getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES));
++ return ZooKeeperHiveHelper.builder()
++ .quorum(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM))
++ .clientPort(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT))
++ .serverRegistryNameSpace(getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE))
++ .connectionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
++ TimeUnit.MILLISECONDS))
++ .sessionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
++ TimeUnit.MILLISECONDS))
++ .baseSleepTime((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
++ TimeUnit.MILLISECONDS))
++ .maxRetries(getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
++ .sslEnabled(getBoolVar(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
++ .keyStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
++ .keyStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
++ .trustStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
++ .trustStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
+ }
+
+ public HiveConf() {
+diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
+index 1fc8d36394..02a8926ed5 100644
+--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
++++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
+@@ -22,11 +22,11 @@
+ import java.util.ArrayList;
+ import java.util.List;
+
++import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+@@ -50,8 +50,12 @@
+ public String overhead_path = null;
+
+ public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
+- public static final String ZK_SESSION_TIMEOUT
+- = "templeton.zookeeper.session-timeout";
++ public static final String ZK_SESSION_TIMEOUT = "templeton.zookeeper.session-timeout";
++ public static final String ZK_SSL_ENABLE = "templeton.zookeeper.ssl.client.enable";
++ public static final String ZK_KEYSTORE_LOCATION = "templeton.zookeeper.keystore.location";
++ public static final String ZK_KEYSTORE_PASSWORD = "templeton.zookeeper.keystore.password";
++ public static final String ZK_TRUSTSTORE_LOCATION = "templeton.zookeeper.truststore.location";
++ public static final String ZK_TRUSTSTORE_PASSWORD = "templeton.zookeeper.truststore.password";
+
+ public static final String ENCODING = "UTF-8";
+
+@@ -59,27 +63,25 @@
+
+ private CuratorFramework zk;
+
+- /**
+- * Open a ZooKeeper connection for the JobState.
+- */
+- public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
+- throws IOException {
+- //do we need to add a connection status listener? What will that do?
+- ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+- CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
+- CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
+- zk.start();
+- return zk;
+- }
+
+ /**
+ * Open a ZooKeeper connection for the JobState.
+ */
+ public static CuratorFramework zkOpen(Configuration conf) throws IOException {
+- /*the silly looking call to Builder below is to get the default value of session timeout
+- from Curator which itself exposes it as system property*/
+- return zkOpen(conf.get(ZK_HOSTS),
+- conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
++ ZooKeeperHiveHelper xkHelper = ZooKeeperHiveHelper.builder()
++ .quorum(conf.get(ZK_HOSTS))
++ .sessionTimeout(conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()))
++ .baseSleepTime(1000)
++ .maxRetries(3)
++ .sslEnabled(conf.getBoolean(ZK_SSL_ENABLE, false))
++ .keyStoreLocation(conf.get(ZK_KEYSTORE_LOCATION, ""))
++ .keyStorePassword(conf.get(ZK_KEYSTORE_PASSWORD, ""))
++ .trustStoreLocation(conf.get(ZK_TRUSTSTORE_LOCATION, ""))
++ .trustStorePassword(conf.get(ZK_TRUSTSTORE_PASSWORD, ""))
++ .build();
++ CuratorFramework zk = xkHelper.getNewZookeeperClient();
++ zk.start();
++ return zk;
+ }
+
+ public ZooKeeperStorage() {
+diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
+new file mode 100644
+index 0000000000..084e097be9
+--- /dev/null
++++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.metastore.security;
++
++import org.junit.BeforeClass;
++
++/**
++ * TestZookeeperTokenStore with zookeeper SSL communication disabled.
++ */
++public class TestZookeeperTokenStorePlain extends ZooKeeperTokenStoreTestBase {
++
++ public TestZookeeperTokenStorePlain(){
++ super();
++ }
++
++ @BeforeClass
++ public static void setUp() throws Exception {
++ setUpInternal(false);
++ }
++}
+diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
+new file mode 100644
+index 0000000000..40179901ce
+--- /dev/null
++++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.metastore.security;
++
++import org.junit.BeforeClass;
++
++/**
++ * TestZookeeperTokenStore with zookeeper SSL communication enabled.
++ */
++public class TestZookeeperTokenStoreSSLEnabled extends ZooKeeperTokenStoreTestBase {
++
++ public TestZookeeperTokenStoreSSLEnabled(){
++ super();
++ }
++
++ @BeforeClass
++ public static void setUp() throws Exception {
++ setUpInternal(true);
++ }
++}
+diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
+similarity index 76%
+rename from itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
+rename to itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
+index 603155bf8f..35053e70b0 100644
+--- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
++++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
+@@ -25,67 +25,84 @@
+
+
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.CuratorFrameworkFactory;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
+-import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+ import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+ import org.apache.hive.testutils.MiniZooKeeperCluster;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.data.ACL;
++import org.junit.AfterClass;
++import org.junit.After;
+ import org.junit.Assert;
++import org.junit.Test;
++
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotSame;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.assertNull;
+ import static org.junit.Assert.fail;
+-import org.junit.Before;
+-import org.junit.After;
+-import org.junit.Test;
+
+ /**
+ * TestZooKeeperTokenStore.
+ */
+-public class TestZooKeeperTokenStore {
++public abstract class ZooKeeperTokenStoreTestBase {
++
++ private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
++ private static final String TRUST_STORE_NAME = "truststore.jks";
++ private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
+
+- private MiniZooKeeperCluster zkCluster = null;
+- private CuratorFramework zkClient = null;
+- private int zkPort = -1;
+- private ZooKeeperTokenStore ts;
++ private static MiniZooKeeperCluster zkCluster = null;
++ private static int zkPort = -1;
++ private static ZooKeeperTokenStore ts;
++ private static boolean zkSslEnabled;
+
+- @Before
+- public void setUp() throws Exception {
++ public static void setUpInternal(boolean sslEnabled) throws Exception{
+ File zkDataDir = new File(System.getProperty("test.tmp.dir"));
+- if (this.zkCluster != null) {
++ if (zkCluster != null) {
+ throw new IOException("Cluster already running");
+ }
+- this.zkCluster = new MiniZooKeeperCluster();
+- this.zkPort = this.zkCluster.startup(zkDataDir);
+- this.zkClient =
+- CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort)
+- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+- this.zkClient.start();
++ zkCluster = new MiniZooKeeperCluster(sslEnabled);
++ zkPort = zkCluster.startup(zkDataDir);
++ zkSslEnabled = sslEnabled;
++ }
++
++ @AfterClass
++ public static void tearDown() throws Exception{
++ zkCluster.shutdown();
++ zkCluster = null;
+ }
+
+ @After
+- public void tearDown() throws Exception {
+- this.zkClient.close();
++ public void closeTokenStore() throws Exception{
+ if (ts != null) {
+ ts.close();
+ }
+- this.zkCluster.shutdown();
+- this.zkCluster = null;
+ }
+
+ private Configuration createConf(String zkPath) {
+ Configuration conf = new Configuration();
+ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:"
+- + this.zkPort);
++ + zkPort);
+ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath);
++ if(zkSslEnabled) {
++ String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
++ System.getProperty("test.data.files") :
++ (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
++ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION,
++ dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
++ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD,
++ KEY_STORE_TRUST_STORE_PASSWORD);
++ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION,
++ dataFileDir + File.separator + TRUST_STORE_NAME);
++ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD,
++ KEY_STORE_TRUST_STORE_PASSWORD);
++ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, "true");
++
++ }
+ return conf;
+ }
+
+@@ -98,6 +115,7 @@ public void testTokenStorage() throws Exception {
+ ts.setConf(conf);
+ ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
+
++ CuratorFramework zkClient = ts.getSession();
+
+ String metastore_zk_path = ZK_PATH + ServerMode.METASTORE;
+ int keySeq = ts.addMasterKey("key1Data");
+@@ -112,6 +130,7 @@ public void testTokenStorage() throws Exception {
+
+ ts.removeMasterKey(keySeq);
+ assertEquals("expected number keys", 1, ts.getMasterKeys().length);
++ ts.removeMasterKey(keySeq2);
+
+ // tokens
+ DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(
+@@ -189,6 +208,8 @@ public void testAclPositive() throws Exception {
+ ts = new ZooKeeperTokenStore();
+ ts.setConf(conf);
+ ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
++
++ CuratorFramework zkClient = ts.getSession();
+ List<ACL> acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE);
+ assertEquals(2, acl.size());
+ }
+diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+index cc32a7e2b8..596c3d6fc1 100644
+--- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
++++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+@@ -111,6 +111,10 @@ public static void startServices() throws Exception {
+ addToExpectedRestrictedMap("spark.home");
+ addToExpectedRestrictedMap("hive.privilege.synchronizer.interval");
+ addToExpectedRestrictedMap("hive.driver.parallel.compilation.global.limit");
++ addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.location");
++ addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.password");
++ addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.location");
++ addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.password");
+ }
+
+ @AfterClass
+diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
+index bd5e811743..3322434cb6 100644
+--- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
++++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
+@@ -22,11 +22,10 @@
+ import com.google.common.collect.Collections2;
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
++import org.apache.curator.framework.recipes.nodes.PersistentNode;
+ import org.apache.curator.retry.RetryOneTime;
+ import org.apache.curator.test.TestingServer;
+ import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+-import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.junit.After;
+@@ -165,9 +164,8 @@ private void publishConfsToZk(Map<String, String> confs, String uri) throws Exce
+ // Publish configs for this instance as the data on the node
+ znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confs);
+ byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
+- PersistentEphemeralNode znode =
+- new PersistentEphemeralNode(client,
+- PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
++ PersistentNode znode = new PersistentNode(client, CreateMode.EPHEMERAL_SEQUENTIAL,
++ false, pathPrefix, znodeDataUTF8);
+ znode.start();
+ // We'll wait for 120s for node creation
+ long znodeCreationTimeout = 120;
+diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
+similarity index 95%
+rename from itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
+rename to itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
+index de2e4937a8..7302e0993a 100644
+--- itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
++++ itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
+@@ -19,6 +19,7 @@
+ package org.apache.hive.service.server;
+
+ import java.io.File;
++import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.Iterator;
+@@ -54,14 +55,14 @@
+ import org.apache.hive.service.cli.OperationHandle;
+ import org.apache.hive.service.cli.RowSet;
+ import org.apache.hive.service.cli.SessionHandle;
++import org.junit.AfterClass;
+ import org.junit.Assert;
+-import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ /**
+ * Test restricted information schema with privilege synchronization
+ */
+-public class TestInformationSchemaWithPrivilege {
++public abstract class InformationSchemaWithPrivilegeTestBase {
+
+ // Group mapping:
+ // group_a: user1, user2
+@@ -175,14 +176,18 @@ public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory metastoreC
+ }
+ }
+
++ private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
++ private static final String TRUST_STORE_NAME = "truststore.jks";
++ private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
++
+ private static MiniHS2 miniHS2 = null;
+ private static MiniZooKeeperCluster zkCluster = null;
+ private static Map<String, String> confOverlay;
+
+- @BeforeClass
+- public static void beforeTest() throws Exception {
++
++ public static void setupInternal(boolean zookeeperSSLEnabled) throws Exception {
+ File zkDataDir = new File(System.getProperty("test.tmp.dir"));
+- zkCluster = new MiniZooKeeperCluster();
++ zkCluster = new MiniZooKeeperCluster(zookeeperSSLEnabled);
+ int zkPort = zkCluster.startup(zkDataDir);
+
+ miniHS2 = new MiniHS2(new HiveConf());
+@@ -206,9 +211,34 @@ public static void beforeTest() throws Exception {
+ confOverlay.put(ConfVars.HIVE_AUTHENTICATOR_MANAGER.varname, FakeGroupAuthenticator.class.getName());
+ confOverlay.put(ConfVars.HIVE_AUTHORIZATION_ENABLED.varname, "true");
+ confOverlay.put(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST.varname, ".*");
++
++ if(zookeeperSSLEnabled) {
++ String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
++ System.getProperty("test.data.files") :
++ (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
++ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION.varname,
++ dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
++ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
++ KEY_STORE_TRUST_STORE_PASSWORD);
++ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION.varname,
++ dataFileDir + File.separator + TRUST_STORE_NAME);
++ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
++ KEY_STORE_TRUST_STORE_PASSWORD);
++ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE.varname, "true");
++ }
+ miniHS2.start(confOverlay);
+ }
+
++ @AfterClass
++ public static void tearDown() throws IOException {
++ if (miniHS2 != null) {
++ miniHS2.stop();
++ }
++ if (zkCluster != null) {
++ zkCluster.shutdown();
++ }
++ }
++
+ @Test
+ public void test() throws Exception {
+
+diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
+new file mode 100644
+index 0000000000..ffa1843ae2
+--- /dev/null
++++ itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hive.service.server;
++
++import org.junit.BeforeClass;
++
++/**
++ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication disabled.
++ */
++public class TestInformationSchemaWithPrivilegeZookeeperPlain extends InformationSchemaWithPrivilegeTestBase {
++
++ public TestInformationSchemaWithPrivilegeZookeeperPlain() {
++ super();
++ }
++
++ @BeforeClass
++ public static void setUp() throws Exception{
++ setupInternal(false);
++ }
++}
+diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
+new file mode 100644
+index 0000000000..e12f4948d4
+--- /dev/null
++++ itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
+@@ -0,0 +1,35 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hive.service.server;
++
++import org.junit.BeforeClass;
++
++/**
++ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication enabled.
++ */
++public class TestInformationSchemaWithPrivilegeZookeeperSSL extends InformationSchemaWithPrivilegeTestBase {
++
++ public TestInformationSchemaWithPrivilegeZookeeperSSL() {
++ super();
++ }
++
++ @BeforeClass
++ public static void setUp() throws Exception{
++ setupInternal(true);
++ }
++}
+diff --git jdbc/src/java/org/apache/hive/jdbc/Utils.java jdbc/src/java/org/apache/hive/jdbc/Utils.java
+index e23826eb56..6cb6853077 100644
+--- jdbc/src/java/org/apache/hive/jdbc/Utils.java
++++ jdbc/src/java/org/apache/hive/jdbc/Utils.java
+@@ -116,6 +116,11 @@
+ public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
+ public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA";
+ public static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
++ public static final String ZOOKEEPER_SSL_ENABLE = "zooKeeperSSLEnable";
++ public static final String ZOOKEEPER_KEYSTORE_LOCATION = "zooKeeperKeystoreLocation";
++ public static final String ZOOKEEPER_KEYSTORE_PASSWORD= "zooKeeperKeystorePassword";
++ public static final String ZOOKEEPER_TRUSTSTORE_LOCATION = "zooKeeperTruststoreLocation";
++ public static final String ZOOKEEPER_TRUSTSTORE_PASSWORD = "zooKeeperTruststorePassword";
+ // Default namespace value on ZooKeeper.
+ // This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
+ static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
+@@ -168,6 +173,11 @@
+ private boolean isEmbeddedMode = false;
+ private String suppliedURLAuthority;
+ private String zooKeeperEnsemble = null;
++ private boolean zooKeeperSslEnabled = false;
++ private String zookeeperKeyStoreLocation = "";
++ private String zookeeperKeyStorePassword = "";
++ private String zookeeperTrustStoreLocation = "";
++ private String zookeeperTrustStorePassword = "";
+ private String currentHostZnodePath;
+ private final List<String> rejectedHostZnodePaths = new ArrayList<String>();
+
+@@ -185,6 +195,12 @@ public JdbcConnectionParams(JdbcConnectionParams params) {
+ this.isEmbeddedMode = params.isEmbeddedMode;
+ this.suppliedURLAuthority = params.suppliedURLAuthority;
+ this.zooKeeperEnsemble = params.zooKeeperEnsemble;
++ this.zooKeeperSslEnabled = params.zooKeeperSslEnabled;
++ this.zookeeperKeyStoreLocation = params.zookeeperKeyStoreLocation;
++ this.zookeeperKeyStorePassword = params.zookeeperKeyStorePassword;
++ this.zookeeperTrustStoreLocation = params.zookeeperTrustStoreLocation;
++ this.zookeeperTrustStorePassword = params.zookeeperTrustStorePassword;
++
+ this.currentHostZnodePath = params.currentHostZnodePath;
+ this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths);
+ }
+@@ -228,6 +244,25 @@ public String getSuppliedURLAuthority() {
+ public String getZooKeeperEnsemble() {
+ return zooKeeperEnsemble;
+ }
++ public boolean isZooKeeperSslEnabled() {
++ return zooKeeperSslEnabled;
++ }
++
++ public String getZookeeperKeyStoreLocation() {
++ return zookeeperKeyStoreLocation;
++ }
++
++ public String getZookeeperKeyStorePassword() {
++ return zookeeperKeyStorePassword;
++ }
++
++ public String getZookeeperTrustStoreLocation() {
++ return zookeeperTrustStoreLocation;
++ }
++
++ public String getZookeeperTrustStorePassword() {
++ return zookeeperTrustStorePassword;
++ }
+
+ public List<String> getRejectedHostZnodePaths() {
+ return rejectedHostZnodePaths;
+@@ -277,6 +312,26 @@ public void setZooKeeperEnsemble(String zooKeeperEnsemble) {
+ this.zooKeeperEnsemble = zooKeeperEnsemble;
+ }
+
++ public void setZooKeeperSslEnabled(boolean zooKeeperSslEnabled) {
++ this.zooKeeperSslEnabled = zooKeeperSslEnabled;
++ }
++
++ public void setZookeeperKeyStoreLocation(String zookeeperKeyStoreLocation) {
++ this.zookeeperKeyStoreLocation = zookeeperKeyStoreLocation;
++ }
++
++ public void setZookeeperKeyStorePassword(String zookeeperKeyStorePassword) {
++ this.zookeeperKeyStorePassword = zookeeperKeyStorePassword;
++ }
++
++ public void setZookeeperTrustStoreLocation(String zookeeperTrustStoreLocation) {
++ this.zookeeperTrustStoreLocation = zookeeperTrustStoreLocation;
++ }
++
++ public void setZookeeperTrustStorePassword(String zookeeperTrustStorePassword) {
++ this.zookeeperTrustStorePassword = zookeeperTrustStorePassword;
++ }
++
+ public void setCurrentHostZnodePath(String currentHostZnodePath) {
+ this.currentHostZnodePath = currentHostZnodePath;
+ }
+@@ -485,6 +540,7 @@ public static JdbcConnectionParams extractURLComponents(String uri, Properties i
+ uri = uri.replace(dummyAuthorityString, authorityStr);
+ // Set ZooKeeper ensemble in connParams for later use
+ connParams.setZooKeeperEnsemble(authorityStr);
++ ZooKeeperHiveClientHelper.setZkSSLParams(connParams);
+ } else {
+ URI jdbcBaseURI = URI.create(URI_HIVE_PREFIX + "//" + authorityStr);
+ // Check to prevent unintentional use of embedded mode. A missing "/"
+@@ -576,7 +632,6 @@ private static void handleParamDeprecation(Map<String, String> fromMap, Map<Stri
+ * host:port pairs.
+ *
+ * @param uri
+- * @param connParams
+ * @return
+ * @throws JdbcUriParseException
+ */
+diff --git jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+index 759ba8a5ef..3d89fa223a 100644
+--- jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
++++ jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+@@ -32,6 +32,7 @@
+ import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.curator.utils.ZKPaths;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.common.SSLZookeeperFactory;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
+ import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
+@@ -85,11 +86,40 @@ public static boolean isZkDynamicDiscoveryMode(Map<String, String> sessionConf)
+ JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode));
+ }
+
++ /**
++ * Parse and set up the SSL communication related Zookeeper params in connParams from sessionVars.
++ * @param connParams
++ */
++ public static void setZkSSLParams(JdbcConnectionParams connParams) {
++ Map<String, String> sessionConf = connParams.getSessionVars();
++ boolean sslEnabled = false;
++ if (sessionConf.containsKey(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE)) {
++ sslEnabled = Boolean.parseBoolean(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE));
++ connParams.setZooKeeperSslEnabled(sslEnabled);
++ }
++ if (sslEnabled) {
++ connParams.setZookeeperKeyStoreLocation(
++ StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_LOCATION), ""));
++ connParams.setZookeeperKeyStorePassword(
++ StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_PASSWORD), ""));
++ connParams.setZookeeperTrustStoreLocation(
++ StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_LOCATION), ""));
++ connParams.setZookeeperTrustStorePassword(
++ StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_PASSWORD), ""));
++ }
++ }
++
+ private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception {
+ String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
+ CuratorFramework zooKeeperClient =
+- CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
++ CuratorFrameworkFactory.builder()
++ .connectString(zooKeeperEnsemble)
++ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
++ .zookeeperFactory(
++ new SSLZookeeperFactory(connParams.isZooKeeperSslEnabled(), connParams.getZookeeperKeyStoreLocation(),
++ connParams.getZookeeperKeyStorePassword(), connParams.getZookeeperTrustStoreLocation(),
++ connParams.getZookeeperTrustStorePassword()))
++ .build();
+ zooKeeperClient.start();
+ return zooKeeperClient;
+ }
+diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+index d28fd1778c..2b21baaea4 100644
+--- llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
++++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+@@ -30,20 +30,18 @@
+ import java.util.concurrent.locks.ReentrantLock;
+
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.api.ACLProvider;
+ import org.apache.curator.framework.imps.CuratorFrameworkState;
+ import org.apache.curator.framework.recipes.cache.ChildData;
+ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
++import org.apache.curator.framework.recipes.nodes.PersistentNode;
+ import org.apache.curator.framework.state.ConnectionState;
+ import org.apache.curator.framework.state.ConnectionStateListener;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.curator.utils.CloseableUtils;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+ import org.apache.hadoop.hive.llap.LlapUtil;
+@@ -55,6 +53,7 @@
+ import org.apache.hadoop.registry.client.types.ServiceRecord;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.yarn.conf.YarnConfiguration;
++import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException.InvalidACLException;
+ import org.apache.zookeeper.KeeperException.NodeExistsException;
+ import org.apache.zookeeper.ZooDefs;
+@@ -111,7 +110,7 @@
+ private final Map<String, Set<InstanceType>> nodeToInstanceCache;
+
+ // The registration znode.
+- private PersistentEphemeralNode znode;
++ private PersistentNode znode;
+ private String znodePath; // unique identity for this instance
+
+ private PathChildrenCache instancesCache; // Created on demand.
+@@ -212,28 +211,23 @@ private ACLProvider getACLProviderForZKPath(String zkPath) {
+ }
+
+ private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
+- String zkEnsemble = getQuorumServers(conf);
+- int sessionTimeout = (int) HiveConf.getTimeVar(conf,
+- ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+- int connectionTimeout = (int) HiveConf.getTimeVar(conf,
+- ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+- int baseSleepTime = (int) HiveConf.getTimeVar(conf,
+- ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+- int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+-
+- LOG.info("Creating curator client with connectString: {} sessionTimeoutMs: {} connectionTimeoutMs: {}" +
+- " namespace: {} exponentialBackoff - sleepTime: {} maxRetries: {}", zkEnsemble, sessionTimeout,
+- connectionTimeout, namespace, baseSleepTime, maxRetries);
+- // Create a CuratorFramework instance to be used as the ZooKeeper client
+- // Use the zooKeeperAclProvider to create appropriate ACLs
+- return CuratorFrameworkFactory.builder()
+- .connectString(zkEnsemble)
+- .sessionTimeoutMs(sessionTimeout)
+- .connectionTimeoutMs(connectionTimeout)
+- .aclProvider(zooKeeperAclProvider)
+- .namespace(namespace)
+- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+- .build();
++ return ZooKeeperHiveHelper.builder()
++ .quorum(conf.get(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname))
++ .clientPort(conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
++ ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()))
++ .connectionTimeout(
++ (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
++ .sessionTimeout(
++ (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS))
++ .baseSleepTime(
++ (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS))
++ .maxRetries(HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
++ .sslEnabled(HiveConf.getBoolVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
++ .keyStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
++ .keyStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
++ .trustStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
++ .trustStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD))
++ .build().getNewZookeeperClient(zooKeeperAclProvider, namespace);
+ }
+
+ private static List<ACL> createSecureAcls() {
+@@ -283,9 +277,9 @@ protected final String registerServiceRecord(ServiceRecord srv, final String uni
+
+ // Create a znode under the rootNamespace parent for this instance of the server
+ try {
+- // PersistentEphemeralNode will make sure the ephemeral node created on server will be present
++ // PersistentNode will make sure the ephemeral node created on server will be present
+ // even under connection or session interruption (will automatically handle retries)
+- znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
++ znode = new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false,
+ workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
+
+ // start the creation of znodes
+diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
+index fa3a382367..e8eaac0fd9 100644
+--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
++++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
+@@ -18,14 +18,11 @@
+
+ package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
+
+-import java.util.concurrent.TimeUnit;
+
+ import org.apache.hive.common.util.ShutdownHookManager;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.CuratorFrameworkFactory;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.hive.conf.HiveConf;
+
+ public class CuratorFrameworkSingleton {
+@@ -50,15 +47,8 @@ public static synchronized CuratorFramework getInstance(HiveConf hiveConf) {
+ } else {
+ conf = hiveConf;
+ }
+- int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+- int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+- int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+- String quorumServers = conf.getZKConfig().getQuorumServers();
+
+- sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers)
+- .sessionTimeoutMs(sessionTimeout)
+- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+- .build();
++ sharedClient = conf.getZKConfig().getNewZookeeperClient();
+ sharedClient.start();
+ }
+
+diff --git ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
+index eec628263a..dc11ae1611 100644
+--- ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
++++ ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
+@@ -34,7 +34,9 @@
+
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hbase.HConstants;
+-import org.apache.zookeeper.server.NIOServerCnxnFactory;
++import org.apache.zookeeper.common.ClientX509Util;
++import org.apache.zookeeper.common.X509Util;
++import org.apache.zookeeper.server.ServerCnxnFactory;
+ import org.apache.zookeeper.server.ZooKeeperServer;
+ import org.apache.zookeeper.server.persistence.FileTxnLog;
+ import org.slf4j.Logger;
+@@ -54,6 +56,9 @@
+
+ private static final int TICK_TIME = 2000;
+ private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
++ private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
++ private static final String TRUST_STORE_NAME = "truststore.jks";
++ private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
+ private int connectionTimeout;
+
+ private boolean started;
+@@ -61,7 +66,7 @@
+ /** The default port. If zero, we use a random port. */
+ private int defaultClientPort = 0;
+
+- private List<NIOServerCnxnFactory> standaloneServerFactoryList;
++ private List<ServerCnxnFactory> standaloneServerFactoryList;
+ private List<ZooKeeperServer> zooKeeperServers;
+ private List<Integer> clientPortList;
+
+@@ -70,11 +75,20 @@
+
+ private Configuration configuration;
+
++ private boolean sslEnabled = false;
++
+ public MiniZooKeeperCluster() {
+ this(new Configuration());
+ }
+-
++ public MiniZooKeeperCluster(boolean sslEnabled) {
++ this(new Configuration(), sslEnabled);
++ }
+ public MiniZooKeeperCluster(Configuration configuration) {
++ this(configuration, false);
++ }
++
++ public MiniZooKeeperCluster(Configuration configuration, boolean sslEnabled) {
++ this.sslEnabled = sslEnabled;
+ this.started = false;
+ this.configuration = configuration;
+ activeZKServerIndex = -1;
+@@ -167,7 +181,7 @@ public int getZooKeeperServerNum() {
+ }
+
+ // / XXX: From o.a.zk.t.ClientBase
+- private static void setupTestEnv() {
++ private static void setupTestEnv(boolean sslEnabled) {
+ // With ZooKeeper 3.5 we need to whitelist the 4 letter commands we use
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+
+@@ -176,6 +190,9 @@ private static void setupTestEnv() {
+ // resulting in test failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
++ if (sslEnabled) {
++ System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
++ }
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+@@ -200,7 +217,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
+ return -1;
+ }
+
+- setupTestEnv();
++ setupTestEnv(sslEnabled);
+ shutdown();
+
+ int tentativePort = -1; // the seed port
+@@ -229,12 +246,10 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
+ // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
+ server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
+ server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
+- NIOServerCnxnFactory standaloneServerFactory;
++ ServerCnxnFactory standaloneServerFactory;
+ while (true) {
+ try {
+- standaloneServerFactory = new NIOServerCnxnFactory();
+- standaloneServerFactory.configure(new InetSocketAddress(currentClientPort),
+- configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
++ standaloneServerFactory = createServerCnxnFactory(currentClientPort);
+ } catch (BindException e) {
+ LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
+ // We're told to use some port but it's occupied, fail
+@@ -252,7 +267,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
+ // Start up this ZK server
+ standaloneServerFactory.startup(server);
+ // Runs a 'stat' against the servers.
+- if (!waitForServerUp(currentClientPort, connectionTimeout)) {
++ if (!sslEnabled && !waitForServerUp(currentClientPort, connectionTimeout)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+@@ -276,6 +291,36 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException, In
+ return clientPort;
+ }
+
++ private ServerCnxnFactory createServerCnxnFactory(int currentClientPort) throws IOException {
++ ServerCnxnFactory serverCnxnFactory = null;
++ if (sslEnabled) {
++ System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
++ "org.apache.zookeeper.server.NettyServerCnxnFactory");
++ String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
++ System.getProperty("test.data.files") :
++ configuration.get("test.data.files").replace('\\', '/').replace("c:", "");
++ X509Util x509Util = new ClientX509Util();
++ System.setProperty(x509Util.getSslKeystoreLocationProperty(),
++ dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
++ System.setProperty(x509Util.getSslKeystorePasswdProperty(),
++ KEY_STORE_TRUST_STORE_PASSWORD);
++ System.setProperty(x509Util.getSslTruststoreLocationProperty(),
++ dataFileDir + File.separator + TRUST_STORE_NAME);
++ System.setProperty(x509Util.getSslTruststorePasswdProperty(),
++ KEY_STORE_TRUST_STORE_PASSWORD);
++ serverCnxnFactory = ServerCnxnFactory.createFactory();
++ serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
++ configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS),
++ true);
++ } else {
++ serverCnxnFactory = ServerCnxnFactory.createFactory();
++ serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
++ configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
++ }
++ return serverCnxnFactory;
++ }
++
++
+ private void createDir(File dir) throws IOException {
+ try {
+ if (!dir.exists()) {
+@@ -292,7 +337,7 @@ private void createDir(File dir) throws IOException {
+ public void shutdown() throws IOException {
+ // shut down all the zk servers
+ for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+- NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
++ ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
+ int clientPort = clientPortList.get(i);
+
+ standaloneServerFactory.shutdown();
+@@ -305,6 +350,7 @@ public void shutdown() throws IOException {
+ for (ZooKeeperServer zkServer : zooKeeperServers) {
+ //explicitly close ZKDatabase since ZookeeperServer does not close them
+ zkServer.getZKDatabase().close();
++ zkServer.shutdown(true);
+ }
+ zooKeeperServers.clear();
+
+@@ -328,7 +374,7 @@ public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedExc
+ }
+
+ // Shutdown the current active one
+- NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
++ ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
+ int clientPort = clientPortList.get(activeZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+@@ -366,7 +412,7 @@ public void killOneBackupZooKeeperServer() throws IOException, InterruptedExcept
+
+ int backupZKServerIndex = activeZKServerIndex + 1;
+ // Shutdown the current active one
+- NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
++ ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
+ int clientPort = clientPortList.get(backupZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java
+index fece82e266..181ea5d6d5 100644
+--- service/src/java/org/apache/hive/service/server/HiveServer2.java
++++ service/src/java/org/apache/hive/service/server/HiveServer2.java
+@@ -43,14 +43,12 @@
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.api.ACLProvider;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorEventType;
+ import org.apache.curator.framework.recipes.leader.LeaderLatch;
+ import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.common.JvmPauseMonitor;
+ import org.apache.hadoop.hive.common.LogUtils;
+@@ -1078,14 +1076,9 @@ private void maybeStartCompactorThreads(HiveConf hiveConf) throws Exception {
+ */
+ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
+ HiveConf hiveConf = new HiveConf();
+- String zooKeeperEnsemble = hiveConf.getZKConfig().getQuorumServers();
+- String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+- int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+- int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+- CuratorFramework zooKeeperClient =
+- CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
++ CuratorFramework zooKeeperClient = hiveConf.getZKConfig().getNewZookeeperClient();
+ zooKeeperClient.start();
++ String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+ List<String> znodePaths =
+ zooKeeperClient.getChildren().forPath(
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
+new file mode 100644
+index 0000000000..ee01731fa9
+--- /dev/null
++++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
+@@ -0,0 +1,78 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.hive.common;
++
++import org.apache.commons.lang3.StringUtils;
++import org.apache.curator.utils.ZookeeperFactory;
++import org.apache.zookeeper.Watcher;
++import org.apache.zookeeper.ZooKeeper;
++import org.apache.zookeeper.client.ZKClientConfig;
++import org.apache.zookeeper.common.ClientX509Util;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * Factory to create Zookeeper clients with the zookeeper.client.secure enabled,
++ * allowing SSL communication with the Zookeeper server.
++ */
++public class SSLZookeeperFactory implements ZookeeperFactory {
++
++ private static final Logger LOG = LoggerFactory.getLogger(SSLZookeeperFactory.class);
++
++ private boolean sslEnabled;
++ private String keyStoreLocation;
++ private String keyStorePassword;
++ private String trustStoreLocation;
++ private String trustStorePassword;
++
++ public SSLZookeeperFactory(boolean sslEnabled, String keyStoreLocation, String keyStorePassword,
++ String trustStoreLocation, String trustStorePassword) {
++
++ this.sslEnabled = sslEnabled;
++ this.keyStoreLocation = keyStoreLocation;
++ this.keyStorePassword = keyStorePassword;
++ this.trustStoreLocation = trustStoreLocation;
++ this.trustStorePassword = trustStorePassword;
++ if (sslEnabled) {
++ if (StringUtils.isEmpty(keyStoreLocation)) {
++ LOG.warn("Missing keystoreLocation parameter");
++ }
++ if (StringUtils.isEmpty(trustStoreLocation)) {
++ LOG.warn("Missing trustStoreLocation parameter");
++ }
++ }
++ }
++
++ @Override
++ public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
++ boolean canBeReadOnly) throws Exception {
++ if (!this.sslEnabled) {
++ return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
++ }
++ ZKClientConfig clientConfig = new ZKClientConfig();
++ clientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
++ clientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
++ ClientX509Util x509Util = new ClientX509Util();
++ clientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(), this.keyStoreLocation);
++ clientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(), this.keyStorePassword);
++ clientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(), this.trustStoreLocation);
++ clientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(), this.trustStorePassword);
++ return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, clientConfig);
++ }
++}
+diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+index 99f7c97877..71d8651712 100644
+--- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
++++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+@@ -23,10 +23,11 @@
+ import java.util.ArrayList;
+ import java.util.concurrent.TimeUnit;
+ import java.util.List;
++
+ import org.apache.curator.framework.api.ACLProvider;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.CuratorFramework;
+-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
++import org.apache.curator.framework.recipes.nodes.PersistentNode;
+ import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+@@ -55,28 +56,164 @@
+ public static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHiveHelper.class.getName());
+ public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
+
+- private String quorum = null;
+- private String clientPort = null;
+- private String rootNamespace = null;
+- private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
++ /**
++ * ZooKeeperHiveHelperBuilder. A builder class to initialize ZooKeeperHiveHelper.
++ */
++ public static class ZooKeeperHiveHelperBuilder {
++ private String quorum = null;
++ private String clientPort = null;
++ private String serverRegistryNameSpace = null;
++ private int connectionTimeout;
++ private int sessionTimeout;
++ private int baseSleepTime;
++ private int maxRetries;
++ private boolean sslEnabled = false;
++ private String keyStoreLocation = null;
++ private String keyStorePassword = null;
++ private String trustStoreLocation = null;
++ private String trustStorePassword = null;
++
++ public ZooKeeperHiveHelper build() {
++ return new ZooKeeperHiveHelper(this);
++ }
++
++ public ZooKeeperHiveHelperBuilder quorum(String quorum) {
++ this.quorum = quorum;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder clientPort(String clientPort) {
++ this.clientPort = clientPort;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder serverRegistryNameSpace(String serverRegistryNameSpace) {
++ this.serverRegistryNameSpace = serverRegistryNameSpace;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder connectionTimeout(int connectionTimeout) {
++ this.connectionTimeout = connectionTimeout;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder sessionTimeout(int sessionTimeout) {
++ this.sessionTimeout = sessionTimeout;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder baseSleepTime(int baseSleepTime) {
++ this.baseSleepTime = baseSleepTime;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder maxRetries(int maxRetries) {
++ this.maxRetries = maxRetries;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder sslEnabled(boolean sslEnabled) {
++ this.sslEnabled = sslEnabled;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder keyStoreLocation(String keyStoreLocation) {
++ this.keyStoreLocation = keyStoreLocation;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder keyStorePassword(String keyStorePassword) {
++ this.keyStorePassword = keyStorePassword;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder trustStoreLocation(String trustStoreLocation) {
++ this.trustStoreLocation = trustStoreLocation;
++ return this;
++ }
++
++ public ZooKeeperHiveHelperBuilder trustStorePassword(String trustStorePassword) {
++ this.trustStorePassword = trustStorePassword;
++ return this;
++ }
++
++ public String getQuorum() {
++ return quorum;
++ }
++
++ public String getClientPort() {
++ return clientPort;
++ }
++
++ public String getServerRegistryNameSpace() {
++ return serverRegistryNameSpace;
++ }
++
++ public int getConnectionTimeout() {
++ return connectionTimeout;
++ }
++
++ public int getSessionTimeout() {
++ return sessionTimeout;
++ }
++
++ public int getBaseSleepTime() {
++ return baseSleepTime;
++ }
++
++ public int getMaxRetries() {
++ return maxRetries;
++ }
++
++ public boolean isSslEnabled() {
++ return sslEnabled;
++ }
++
++ public String getKeyStoreLocation() {
++ return keyStoreLocation;
++ }
++
++ public String getKeyStorePassword() {
++ return keyStorePassword;
++ }
++
++ public String getTrustStoreLocation() {
++ return trustStoreLocation;
++ }
++
++ public String getTrustStorePassword() {
++ return trustStorePassword;
++ }
++ }
++
++ public static ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder builder() {
++ return new ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder();
++ }
++
++ private String quorum;
++ private String rootNamespace;
++ private int connectionTimeout;
+ private int sessionTimeout;
+ private int baseSleepTime;
+ private int maxRetries;
++ private boolean sslEnabled;
+
++ private SSLZookeeperFactory sslZookeeperFactory;
+ private CuratorFramework zooKeeperClient;
+- private PersistentEphemeralNode znode;
++ private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
++ private PersistentNode znode;
++
+
+- public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespace,
+- int sessionTimeout, int baseSleepTime, int maxRetries) {
++ public ZooKeeperHiveHelper(ZooKeeperHiveHelperBuilder builder) {
+ // Get the ensemble server addresses in the format host1:port1, host2:port2, ... . Append
+ // the configured port to hostname if the hostname doesn't contain a port.
+- String[] hosts = quorum.split(",");
++ String[] hosts = builder.getQuorum().split(",");
+ StringBuilder quorumServers = new StringBuilder();
+ for (int i = 0; i < hosts.length; i++) {
+ quorumServers.append(hosts[i].trim());
+ if (!hosts[i].contains(":")) {
+ quorumServers.append(":");
+- quorumServers.append(clientPort);
++ quorumServers.append(builder.getClientPort());
+ }
+
+ if (i != hosts.length - 1) {
+@@ -85,11 +222,19 @@ public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespac
+ }
+
+ this.quorum = quorumServers.toString();
+- this.clientPort = clientPort;
+- this.rootNamespace = rootNamespace;
+- this.sessionTimeout = sessionTimeout;
+- this.baseSleepTime = baseSleepTime;
+- this.maxRetries = maxRetries;
++ this.rootNamespace = builder.getServerRegistryNameSpace();
++ this.connectionTimeout = builder.getConnectionTimeout();
++ this.sessionTimeout = builder.getSessionTimeout();
++ this.baseSleepTime = builder.getBaseSleepTime();
++ this.maxRetries = builder.getMaxRetries();
++ this.sslEnabled = builder.isSslEnabled();
++ this.sslZookeeperFactory =
++ new SSLZookeeperFactory(sslEnabled,
++ builder.getKeyStoreLocation(),
++ builder.getKeyStorePassword(),
++ builder.getTrustStoreLocation(),
++ builder.getTrustStorePassword());
++
+ }
+
+ /**
+@@ -118,8 +263,7 @@ public void addServerInstanceToZooKeeper(String znodePathPrefix, String znodeDat
+ + ZOOKEEPER_PATH_SEPARATOR + znodePathPrefix;
+ byte[] znodeDataUTF8 = znodeData.getBytes(StandardCharsets.UTF_8);
+ znode =
+- new PersistentEphemeralNode(zooKeeperClient,
+- PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
++ new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false, pathPrefix, znodeDataUTF8);
+ znode.start();
+ // We'll wait for 120s for node creation
+ long znodeCreationTimeout = 120;
+@@ -147,17 +291,7 @@ public void addServerInstanceToZooKeeper(String znodePathPrefix, String znodeDat
+
+ public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
+ boolean addParentNode) throws Exception {
+- String zooKeeperEnsemble = getQuorumServers();
+- // Create a CuratorFramework instance to be used as the ZooKeeper client.
+- // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
+- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+- .connectString(zooKeeperEnsemble)
+- .sessionTimeoutMs(sessionTimeout)
+- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
+- if (zooKeeperAclProvider != null) {
+- builder = builder.aclProvider(zooKeeperAclProvider);
+- }
+- CuratorFramework zkClient = builder.build();
++ CuratorFramework zkClient = getNewZookeeperClient(zooKeeperAclProvider);
+ zkClient.start();
+
+ // Create the parent znodes recursively; ignore if the parent already exists.
+@@ -177,6 +311,39 @@ public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
+ }
+ return zkClient;
+ }
++ public CuratorFramework getNewZookeeperClient() {
++ return getNewZookeeperClient(null, null);
++ }
++ public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider) {
++ return getNewZookeeperClient(zooKeeperAclProvider, null);
++ }
++
++ public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider, String nameSpace) {
++ LOG.info("Creating curator client with connectString: {} namespace: {} sessionTimeoutMs: {}" +
++ " connectionTimeoutMs: {} exponentialBackoff - sleepTime: {} maxRetries: {} sslEnabled: {}",
++ quorum, nameSpace, sessionTimeout,
++ connectionTimeout, baseSleepTime, maxRetries, sslEnabled);
++ // Create a CuratorFramework instance to be used as the ZooKeeper client.
++ // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
++ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
++ .connectString(quorum)
++ .namespace(nameSpace)
++ .zookeeperFactory(this.sslZookeeperFactory);
++ if (connectionTimeout > 0) {
++ builder = builder.connectionTimeoutMs(connectionTimeout);
++ }
++ if (sessionTimeout > 0) {
++ builder = builder.sessionTimeoutMs(sessionTimeout);
++ }
++ if (maxRetries > 0) {
++ builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
++ }
++ if (zooKeeperAclProvider != null) {
++ builder = builder.aclProvider(zooKeeperAclProvider);
++ }
++
++ return builder.build();
++ }
+
+ public void removeServerInstanceFromZooKeeper() throws Exception {
+ setDeregisteredWithZooKeeper(true);
+@@ -185,7 +352,9 @@ public void removeServerInstanceFromZooKeeper() throws Exception {
+ znode.close();
+ znode = null;
+ }
+- zooKeeperClient.close();
++ if (zooKeeperClient != null) {
++ zooKeeperClient.close();
++ }
+ LOG.info("Server instance removed from ZooKeeper.");
+ }
+
+diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+index fc6a2fd43a..d6a6c96a6a 100644
+--- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
++++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+@@ -258,7 +258,11 @@ public String toString() {
+ ConfVars.SSL_TRUSTSTORE_PASSWORD.varname,
+ ConfVars.SSL_TRUSTSTORE_PASSWORD.hiveName,
+ ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.varname,
+- ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName
++ ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName,
++ ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
++ ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.hiveName,
++ ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
++ ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.hiveName
+ );
+
+ public static ConfVars getMetaConf(String name) {
+@@ -1072,32 +1076,56 @@ public static ConfVars getMetaConf(String name) {
+ "If ZooKeeper is configured for Kerberos authentication. This could be useful when cluster\n" +
+ "is kerberized, but Zookeeper is not."),
+ THRIFT_ZOOKEEPER_CLIENT_PORT("metastore.zookeeper.client.port",
+- "hive.metastore.zookeeper.client.port", "2181",
++ "hive.zookeeper.client.port", "2181",
+ "The port of ZooKeeper servers to talk to.\n" +
+ "If the list of Zookeeper servers specified in hive.metastore.thrift.uris" +
+ " does not contain port numbers, this value is used."),
+ THRIFT_ZOOKEEPER_SESSION_TIMEOUT("metastore.zookeeper.session.timeout",
+- "hive.metastore.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
++ "hive.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "ZooKeeper client's session timeout (in milliseconds). The client is disconnected\n" +
+ "if a heartbeat is not sent in the timeout."),
+ THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT("metastore.zookeeper.connection.timeout",
+- "hive.metastore.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
++ "hive.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
+ new TimeValidator(TimeUnit.SECONDS),
+ "ZooKeeper client's connection timeout in seconds. " +
+ "Connection timeout * hive.metastore.zookeeper.connection.max.retries\n" +
+ "with exponential backoff is when curator client deems connection is lost to zookeeper."),
+ THRIFT_ZOOKEEPER_NAMESPACE("metastore.zookeeper.namespace",
+- "hive.metastore.zookeeper.namespace", "hive_metastore",
++ "hive.zookeeper.namespace", "hive_metastore",
+ "The parent node under which all ZooKeeper nodes for metastores are created."),
+ THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES("metastore.zookeeper.connection.max.retries",
+- "hive.metastore.zookeeper.connection.max.retries", 3,
++ "hive.zookeeper.connection.max.retries", 3,
+ "Max number of times to retry when connecting to the ZooKeeper server."),
+ THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME("metastore.zookeeper.connection.basesleeptime",
+- "hive.metastore.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
++ "hive.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Initial amount of time (in milliseconds) to wait between retries\n" +
+ "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
++ THRIFT_ZOOKEEPER_SSL_ENABLE("metastore.zookeeper.ssl.client.enable",
++ "hive.zookeeper.ssl.client.enable", false,
++ "Set client to use TLS when connecting to ZooKeeper. An explicit value overrides any value set via the " +
++ "zookeeper.client.secure system property (note the different name). Defaults to false if neither is set."),
++ THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION("metastore.zookeeper.ssl.keystore.location",
++ "hive.zookeeper.ssl.keystore.location", "",
++ "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
++ "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
++ "system property (note the camelCase)."),
++ THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("metastore.zookeeper.ssl.keystore.password",
++ "hive.zookeeper.ssl.keystore.password", "",
++ "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
++ "Overrides any explicit value set via the zookeeper.ssl.keyStore.password" +
++ "system property (note the camelCase)."),
++ THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("metastore.zookeeper.ssl.truststore.location",
++ "hive.zookeeper.ssl.truststore.location", "",
++ "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
++ "Overrides any explicit value set via the zookeeper.ssl.trustStore.location " +
++ "system property (note the camelCase)."),
++ THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("metastore.zookeeper.ssl.truststore.password",
++ "hive.zookeeper.ssl.truststore.password", "",
++ "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
++ "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
++ "system property (note the camelCase)."),
+ THRIFT_URI_SELECTION("metastore.thrift.uri.selection", "hive.metastore.uri.selection", "RANDOM",
+ new StringSetValidator("RANDOM", "SEQUENTIAL"),
+ "Determines the selection mechanism used by metastore client to connect to remote " +
+@@ -2018,14 +2046,22 @@ public static boolean isEmbeddedMetaStore(String msUri) {
+ }
+
+ public static ZooKeeperHiveHelper getZKConfig(Configuration conf) {
+- return new ZooKeeperHiveHelper(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS),
+- MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT),
+- MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE),
+- (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
+- TimeUnit.MILLISECONDS),
+- (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+- TimeUnit.MILLISECONDS),
+- MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES));
++ return ZooKeeperHiveHelper.builder()
++ .quorum(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS))
++ .clientPort(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT))
++ .serverRegistryNameSpace(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE))
++ .connectionTimeout((int) getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT,
++ TimeUnit.MILLISECONDS))
++ .sessionTimeout((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
++ TimeUnit.MILLISECONDS))
++ .baseSleepTime((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
++ TimeUnit.MILLISECONDS))
++ .maxRetries(MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES))
++ .sslEnabled(MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_ENABLE))
++ .keyStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
++ .keyStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
++ .trustStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
++ .trustStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
+ }
+
+ /**
+diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
+index b35dc7ce4b..239bff6dc9 100644
+--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
++++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
+@@ -46,6 +46,16 @@
+ public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+ "hive.cluster.delegation.token.store.zookeeper.acl";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hivedelegation";
++ public static final String DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE =
++ "hive.cluster.delegation.token.store.zookeeper.ssl.client.enable";
++ public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION =
++ "hive.cluster.delegation.token.store.zookeeper.keystore.location";
++ public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD =
++ "hive.cluster.delegation.token.store.zookeeper.keystore.password";
++ public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION =
++ "hive.cluster.delegation.token.store.zookeeper.truststore.location";
++ public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD =
++ "hive.cluster.delegation.token.store.zookeeper.truststore.password";
+
+ public MetastoreDelegationTokenManager() {
+ }
+diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
+index af52fcc5f6..dd2af7ef1f 100644
+--- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
++++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
+@@ -29,8 +29,8 @@
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.api.ACLProvider;
+ import org.apache.curator.framework.imps.CuratorFrameworkState;
+-import org.apache.curator.retry.ExponentialBackoffRetry;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+ import org.apache.hadoop.security.UserGroupInformation;
+@@ -57,12 +57,22 @@
+ protected static final String ZK_SEQ_FORMAT = "%010d";
+ private static final String NODE_KEYS = "/keys";
+ private static final String NODE_TOKENS = "/tokens";
++ private static final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
++ + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
+
+ private String rootNode = "";
+ private volatile CuratorFramework zkSession;
+ private String zkConnectString;
+ private int connectTimeoutMillis;
++ private boolean sslEnabled;
++ private String keyStoreLocation;
++ private String keyStorePassword;
++ private String trustStoreLocation;
++ private String trustStorePassword;
++
+ private List<ACL> newNodeAcl;
++ private Configuration conf;
++ private HadoopThriftAuthBridge.Server.ServerMode serverMode;
+
+ /**
+ * ACLProvider permissions will be used in case parent dirs need to be created
+@@ -110,12 +120,7 @@ private boolean isKerberosEnabled(Configuration conf) {
+ }
+ }
+
+- private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
+- + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
+-
+- private Configuration conf;
+
+- private HadoopThriftAuthBridge.Server.ServerMode serverMode;
+
+ /**
+ * Default constructor for dynamic instantiation w/ Configurable
+@@ -124,14 +129,22 @@ private boolean isKerberosEnabled(Configuration conf) {
+ protected ZooKeeperTokenStore() {
+ }
+
+- private CuratorFramework getSession() {
++ public CuratorFramework getSession() {
+ if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+ synchronized (this) {
+ if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+- zkSession =
+- CuratorFrameworkFactory.builder().connectString(zkConnectString)
+- .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
+- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
++ ZooKeeperHiveHelper zkHelper = ZooKeeperHiveHelper.builder()
++ .quorum(zkConnectString)
++ .connectionTimeout(connectTimeoutMillis)
++ .maxRetries(3)
++ .baseSleepTime(1000)
++ .sslEnabled(sslEnabled)
++ .keyStoreLocation(keyStoreLocation)
++ .keyStorePassword(keyStorePassword)
++ .trustStoreLocation(trustStoreLocation)
++ .trustStorePassword(trustStorePassword)
++ .build();
++ zkSession = zkHelper.getNewZookeeperClient(aclDefaultProvider);
+ zkSession.start();
+ }
+ }
+@@ -478,10 +491,14 @@ public void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode sMo
+ + WHEN_ZK_DSTORE_MSG);
+ }
+ }
+- connectTimeoutMillis =
+- conf.getInt(
+- MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+- CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
++ connectTimeoutMillis = conf.getInt(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
++ CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
++
++ sslEnabled = conf.getBoolean(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, false);
++ keyStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION, "");
++ keyStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD, "");
++ trustStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION, "");
++ trustStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD, "");
+
+ String aclStr = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+ this.newNodeAcl = StringUtils.isNotBlank(aclStr)? parseACLs(aclStr) : getDefaultAcl(conf);
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d50912b..34df01e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2672,6 +2672,25 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS),
"Initial amount of time (in milliseconds) to wait between retries\n" +
"when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
+ HIVE_ZOOKEEPER_SSL_ENABLE("hive.zookeeper.ssl.client.enable", false,
+ "Set client to use TLS when connecting to ZooKeeper. An explicit value overrides any value set via the " +
+ "zookeeper.client.secure system property (note the different name). Defaults to false if neither is set."),
+ HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION("hive.zookeeper.ssl.keystore.location", "",
+ "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
+ "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
+ "system property (note the camelCase)."),
+ HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("hive.zookeeper.ssl.keystore.password", "",
+ "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+ "Overrides any explicit value set via the zookeeper.ssl.keyStore.password " +
+ "system property (note the camelCase)."),
+ HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("hive.zookeeper.ssl.truststore.location", "",
+ "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
+ "Overrides any explicit value set via the zookeeper.ssl.trustStore.location" +
+ "system property (note the camelCase)."),
+ HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("hive.zookeeper.ssl.truststore.password", "",
+ "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+ "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
+ "system property (note the camelCase)."),
// Transactions
HIVE_TXN_MANAGER("hive.txn.manager",
@@ -4795,14 +4814,18 @@ public class HiveConf extends Configuration {
"hive.spark.client.rpc.server.address," +
"hive.spark.client.rpc.server.port," +
"hive.spark.client.rpc.sasl.mechanisms," +
- "bonecp.,"+
- "hive.druid.broker.address.default,"+
- "hive.druid.coordinator.address.default,"+
- "hikaricp.,"+
- "hadoop.bin.path,"+
- "yarn.bin.path,"+
- "spark.home,"+
- "hive.driver.parallel.compilation.global.limit",
+ "bonecp.," +
+ "hive.druid.broker.address.default," +
+ "hive.druid.coordinator.address.default," +
+ "hikaricp.," +
+ "hadoop.bin.path," +
+ "yarn.bin.path," +
+ "spark.home," +
+ "hive.driver.parallel.compilation.global.limit," +
+ "hive.zookeeper.ssl.keystore.location," +
+ "hive.zookeeper.ssl.keystore.password," +
+ "hive.zookeeper.ssl.truststore.location," +
+ "hive.zookeeper.ssl.truststore.password",
"Comma separated list of configuration options which are immutable at runtime"),
HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname
@@ -4817,7 +4840,11 @@ public class HiveConf extends Configuration {
+ ",fs.s3a.proxy.password"
+ ",dfs.adls.oauth2.credential"
+ ",fs.adl.oauth2.credential"
- + ",fs.azure.account.oauth2.client.secret",
+ + ",fs.azure.account.oauth2.client.secret"
+ + ",hive.zookeeper.ssl.keystore.location"
+ + ",hive.zookeeper.ssl.keystore.password"
+ + ",hive.zookeeper.ssl.truststore.location"
+ + ",hive.zookeeper.ssl.truststore.password",
"Comma separated list of configuration options which should not be read by normal user like passwords"),
HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
"hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
@@ -5619,14 +5646,22 @@ public class HiveConf extends Configuration {
* given HiveConf.
*/
public ZooKeeperHiveHelper getZKConfig() {
- return new ZooKeeperHiveHelper(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM),
- getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT),
- getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE),
- (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
- TimeUnit.MILLISECONDS),
- (int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
- TimeUnit.MILLISECONDS),
- getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES));
+ return ZooKeeperHiveHelper.builder()
+ .quorum(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM))
+ .clientPort(getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT))
+ .serverRegistryNameSpace(getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE))
+ .connectionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS))
+ .sessionTimeout((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+ TimeUnit.MILLISECONDS))
+ .baseSleepTime((int) getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+ TimeUnit.MILLISECONDS))
+ .maxRetries(getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
+ .sslEnabled(getBoolVar(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
+ .keyStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
+ .keyStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
+ .trustStoreLocation(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
+ .trustStorePassword(getVar(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
}
public HiveConf() {
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
index 1fc8d36..02a8926 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
@@ -22,11 +22,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -50,8 +50,12 @@ public class ZooKeeperStorage implements TempletonStorage {
public String overhead_path = null;
public static final String ZK_HOSTS = "templeton.zookeeper.hosts";
- public static final String ZK_SESSION_TIMEOUT
- = "templeton.zookeeper.session-timeout";
+ public static final String ZK_SESSION_TIMEOUT = "templeton.zookeeper.session-timeout";
+ public static final String ZK_SSL_ENABLE = "templeton.zookeeper.ssl.client.enable";
+ public static final String ZK_KEYSTORE_LOCATION = "templeton.zookeeper.keystore.location";
+ public static final String ZK_KEYSTORE_PASSWORD = "templeton.zookeeper.keystore.password";
+ public static final String ZK_TRUSTSTORE_LOCATION = "templeton.zookeeper.truststore.location";
+ public static final String ZK_TRUSTSTORE_PASSWORD = "templeton.zookeeper.truststore.password";
public static final String ENCODING = "UTF-8";
@@ -59,27 +63,25 @@ public class ZooKeeperStorage implements TempletonStorage {
private CuratorFramework zk;
- /**
- * Open a ZooKeeper connection for the JobState.
- */
- public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
- throws IOException {
- //do we need to add a connection status listener? What will that do?
- ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
- CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
- CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
- zk.start();
- return zk;
- }
/**
* Open a ZooKeeper connection for the JobState.
*/
public static CuratorFramework zkOpen(Configuration conf) throws IOException {
- /*the silly looking call to Builder below is to get the default value of session timeout
- from Curator which itself exposes it as system property*/
- return zkOpen(conf.get(ZK_HOSTS),
- conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
+ ZooKeeperHiveHelper xkHelper = ZooKeeperHiveHelper.builder()
+ .quorum(conf.get(ZK_HOSTS))
+ .sessionTimeout(conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()))
+ .baseSleepTime(1000)
+ .maxRetries(3)
+ .sslEnabled(conf.getBoolean(ZK_SSL_ENABLE, false))
+ .keyStoreLocation(conf.get(ZK_KEYSTORE_LOCATION, ""))
+ .keyStorePassword(conf.get(ZK_KEYSTORE_PASSWORD, ""))
+ .trustStoreLocation(conf.get(ZK_TRUSTSTORE_LOCATION, ""))
+ .trustStorePassword(conf.get(ZK_TRUSTSTORE_PASSWORD, ""))
+ .build();
+ CuratorFramework zk = xkHelper.getNewZookeeperClient();
+ zk.start();
+ return zk;
}
public ZooKeeperStorage() {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
new file mode 100644
index 0000000..084e097
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStorePlain.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import org.junit.BeforeClass;
+
+/**
+ * TestZookeeperTokenStore with zookeeper SSL communication disabled.
+ */
+public class TestZookeeperTokenStorePlain extends ZooKeeperTokenStoreTestBase {
+
+ public TestZookeeperTokenStorePlain(){
+ super();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ setUpInternal(false);
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
new file mode 100644
index 0000000..4017990
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZookeeperTokenStoreSSLEnabled.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import org.junit.BeforeClass;
+
+/**
+ * TestZookeeperTokenStore with zookeeper SSL communication enabled.
+ */
+public class TestZookeeperTokenStoreSSLEnabled extends ZooKeeperTokenStoreTestBase {
+
+ public TestZookeeperTokenStoreSSLEnabled(){
+ super();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ setUpInternal(true);
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
similarity index 76%
rename from itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
rename to itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
index 603155b..35053e7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/TestZooKeeperTokenStore.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStoreTestBase.java
@@ -25,67 +25,84 @@ import java.util.List;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
-import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
import org.apache.hive.testutils.MiniZooKeeperCluster;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
+import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Test;
+
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
-import org.junit.Before;
-import org.junit.After;
-import org.junit.Test;
/**
* TestZooKeeperTokenStore.
*/
-public class TestZooKeeperTokenStore {
+public abstract class ZooKeeperTokenStoreTestBase {
+
+ private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
+ private static final String TRUST_STORE_NAME = "truststore.jks";
+ private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
- private MiniZooKeeperCluster zkCluster = null;
- private CuratorFramework zkClient = null;
- private int zkPort = -1;
- private ZooKeeperTokenStore ts;
+ private static MiniZooKeeperCluster zkCluster = null;
+ private static int zkPort = -1;
+ private static ZooKeeperTokenStore ts;
+ private static boolean zkSslEnabled;
- @Before
- public void setUp() throws Exception {
+ public static void setUpInternal(boolean sslEnabled) throws Exception{
File zkDataDir = new File(System.getProperty("test.tmp.dir"));
- if (this.zkCluster != null) {
+ if (zkCluster != null) {
throw new IOException("Cluster already running");
}
- this.zkCluster = new MiniZooKeeperCluster();
- this.zkPort = this.zkCluster.startup(zkDataDir);
- this.zkClient =
- CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
- this.zkClient.start();
+ zkCluster = new MiniZooKeeperCluster(sslEnabled);
+ zkPort = zkCluster.startup(zkDataDir);
+ zkSslEnabled = sslEnabled;
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception{
+ zkCluster.shutdown();
+ zkCluster = null;
}
@After
- public void tearDown() throws Exception {
- this.zkClient.close();
+ public void closeTokenStore() throws Exception{
if (ts != null) {
ts.close();
}
- this.zkCluster.shutdown();
- this.zkCluster = null;
}
private Configuration createConf(String zkPath) {
Configuration conf = new Configuration();
conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:"
- + this.zkPort);
+ + zkPort);
conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath);
+ if(zkSslEnabled) {
+ String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
+ System.getProperty("test.data.files") :
+ (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
+ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION,
+ dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
+ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD,
+ KEY_STORE_TRUST_STORE_PASSWORD);
+ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION,
+ dataFileDir + File.separator + TRUST_STORE_NAME);
+ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD,
+ KEY_STORE_TRUST_STORE_PASSWORD);
+ conf.set(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, "true");
+
+ }
return conf;
}
@@ -98,6 +115,7 @@ public class TestZooKeeperTokenStore {
ts.setConf(conf);
ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
+ CuratorFramework zkClient = ts.getSession();
String metastore_zk_path = ZK_PATH + ServerMode.METASTORE;
int keySeq = ts.addMasterKey("key1Data");
@@ -112,6 +130,7 @@ public class TestZooKeeperTokenStore {
ts.removeMasterKey(keySeq);
assertEquals("expected number keys", 1, ts.getMasterKeys().length);
+ ts.removeMasterKey(keySeq2);
// tokens
DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(
@@ -189,6 +208,8 @@ public class TestZooKeeperTokenStore {
ts = new ZooKeeperTokenStore();
ts.setConf(conf);
ts.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
+
+ CuratorFramework zkClient = ts.getSession();
List<ACL> acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE);
assertEquals(2, acl.size());
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
index cc32a7e..596c3d6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRestrictedList.java
@@ -111,6 +111,10 @@ public class TestRestrictedList {
addToExpectedRestrictedMap("spark.home");
addToExpectedRestrictedMap("hive.privilege.synchronizer.interval");
addToExpectedRestrictedMap("hive.driver.parallel.compilation.global.limit");
+ addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.location");
+ addToExpectedRestrictedMap("hive.zookeeper.ssl.keystore.password");
+ addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.location");
+ addToExpectedRestrictedMap("hive.zookeeper.ssl.truststore.password");
}
@AfterClass
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
index bd5e811..3322434 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java
@@ -22,11 +22,10 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
@@ -165,9 +164,8 @@ public class TestServiceDiscovery {
// Publish configs for this instance as the data on the node
znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confs);
byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
- PersistentEphemeralNode znode =
- new PersistentEphemeralNode(client,
- PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
+ PersistentNode znode = new PersistentNode(client, CreateMode.EPHEMERAL_SEQUENTIAL,
+ false, pathPrefix, znodeDataUTF8);
znode.start();
// We'll wait for 120s for node creation
long znodeCreationTimeout = 120;
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
similarity index 95%
rename from itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
rename to itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
index de2e493..7302e09 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilege.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/InformationSchemaWithPrivilegeTestBase.java
@@ -19,6 +19,7 @@
package org.apache.hive.service.server;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -54,14 +55,14 @@ import org.apache.hive.service.cli.CLIServiceClient;
import org.apache.hive.service.cli.OperationHandle;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.SessionHandle;
+import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test restricted information schema with privilege synchronization
*/
-public class TestInformationSchemaWithPrivilege {
+public abstract class InformationSchemaWithPrivilegeTestBase {
// Group mapping:
// group_a: user1, user2
@@ -175,14 +176,18 @@ public class TestInformationSchemaWithPrivilege {
}
}
+ private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
+ private static final String TRUST_STORE_NAME = "truststore.jks";
+ private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
+
private static MiniHS2 miniHS2 = null;
private static MiniZooKeeperCluster zkCluster = null;
private static Map<String, String> confOverlay;
- @BeforeClass
- public static void beforeTest() throws Exception {
+
+ public static void setupInternal(boolean zookeeperSSLEnabled) throws Exception {
File zkDataDir = new File(System.getProperty("test.tmp.dir"));
- zkCluster = new MiniZooKeeperCluster();
+ zkCluster = new MiniZooKeeperCluster(zookeeperSSLEnabled);
int zkPort = zkCluster.startup(zkDataDir);
miniHS2 = new MiniHS2(new HiveConf());
@@ -206,9 +211,34 @@ public class TestInformationSchemaWithPrivilege {
confOverlay.put(ConfVars.HIVE_AUTHENTICATOR_MANAGER.varname, FakeGroupAuthenticator.class.getName());
confOverlay.put(ConfVars.HIVE_AUTHORIZATION_ENABLED.varname, "true");
confOverlay.put(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST.varname, ".*");
+
+ if(zookeeperSSLEnabled) {
+ String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
+ System.getProperty("test.data.files") :
+ (new HiveConf()).get("test.data.files").replace('\\', '/').replace("c:", "");
+ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION.varname,
+ dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
+ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
+ KEY_STORE_TRUST_STORE_PASSWORD);
+ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION.varname,
+ dataFileDir + File.separator + TRUST_STORE_NAME);
+ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
+ KEY_STORE_TRUST_STORE_PASSWORD);
+ confOverlay.put(ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE.varname, "true");
+ }
miniHS2.start(confOverlay);
}
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (miniHS2 != null) {
+ miniHS2.stop();
+ }
+ if (zkCluster != null) {
+ zkCluster.shutdown();
+ }
+ }
+
@Test
public void test() throws Exception {
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
new file mode 100644
index 0000000..ffa1843
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperPlain.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import org.junit.BeforeClass;
+
+/**
+ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication disabled.
+ */
+public class TestInformationSchemaWithPrivilegeZookeeperPlain extends InformationSchemaWithPrivilegeTestBase {
+
+ public TestInformationSchemaWithPrivilegeZookeeperPlain() {
+ super();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception{
+ setupInternal(false);
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
new file mode 100644
index 0000000..e12f494
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestInformationSchemaWithPrivilegeZookeeperSSL.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import org.junit.BeforeClass;
+
+/**
+ * Test restricted information schema with privilege synchronization with Zookeeper SSL communication enabled.
+ */
+public class TestInformationSchemaWithPrivilegeZookeeperSSL extends InformationSchemaWithPrivilegeTestBase {
+
+ public TestInformationSchemaWithPrivilegeZookeeperSSL() {
+ super();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception{
+ setupInternal(true);
+ }
+}
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index e23826e..6cb6853 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -116,6 +116,11 @@ public class Utils {
public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
public static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA = "zooKeeperHA";
public static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
+ public static final String ZOOKEEPER_SSL_ENABLE = "zooKeeperSSLEnable";
+ public static final String ZOOKEEPER_KEYSTORE_LOCATION = "zooKeeperKeystoreLocation";
+ public static final String ZOOKEEPER_KEYSTORE_PASSWORD= "zooKeeperKeystorePassword";
+ public static final String ZOOKEEPER_TRUSTSTORE_LOCATION = "zooKeeperTruststoreLocation";
+ public static final String ZOOKEEPER_TRUSTSTORE_PASSWORD = "zooKeeperTruststorePassword";
// Default namespace value on ZooKeeper.
// This value is used if the param "zooKeeperNamespace" is not specified in the JDBC Uri.
static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
@@ -168,6 +173,11 @@ public class Utils {
private boolean isEmbeddedMode = false;
private String suppliedURLAuthority;
private String zooKeeperEnsemble = null;
+ private boolean zooKeeperSslEnabled = false;
+ private String zookeeperKeyStoreLocation = "";
+ private String zookeeperKeyStorePassword = "";
+ private String zookeeperTrustStoreLocation = "";
+ private String zookeeperTrustStorePassword = "";
private String currentHostZnodePath;
private final List<String> rejectedHostZnodePaths = new ArrayList<String>();
@@ -185,6 +195,12 @@ public class Utils {
this.isEmbeddedMode = params.isEmbeddedMode;
this.suppliedURLAuthority = params.suppliedURLAuthority;
this.zooKeeperEnsemble = params.zooKeeperEnsemble;
+ this.zooKeeperSslEnabled = params.zooKeeperSslEnabled;
+ this.zookeeperKeyStoreLocation = params.zookeeperKeyStoreLocation;
+ this.zookeeperKeyStorePassword = params.zookeeperKeyStorePassword;
+ this.zookeeperTrustStoreLocation = params.zookeeperTrustStoreLocation;
+ this.zookeeperTrustStorePassword = params.zookeeperTrustStorePassword;
+
this.currentHostZnodePath = params.currentHostZnodePath;
this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths);
}
@@ -228,6 +244,25 @@ public class Utils {
public String getZooKeeperEnsemble() {
return zooKeeperEnsemble;
}
+ public boolean isZooKeeperSslEnabled() {
+ return zooKeeperSslEnabled;
+ }
+
+ public String getZookeeperKeyStoreLocation() {
+ return zookeeperKeyStoreLocation;
+ }
+
+ public String getZookeeperKeyStorePassword() {
+ return zookeeperKeyStorePassword;
+ }
+
+ public String getZookeeperTrustStoreLocation() {
+ return zookeeperTrustStoreLocation;
+ }
+
+ public String getZookeeperTrustStorePassword() {
+ return zookeeperTrustStorePassword;
+ }
public List<String> getRejectedHostZnodePaths() {
return rejectedHostZnodePaths;
@@ -277,6 +312,26 @@ public class Utils {
this.zooKeeperEnsemble = zooKeeperEnsemble;
}
+ public void setZooKeeperSslEnabled(boolean zooKeeperSslEnabled) {
+ this.zooKeeperSslEnabled = zooKeeperSslEnabled;
+ }
+
+ public void setZookeeperKeyStoreLocation(String zookeeperKeyStoreLocation) {
+ this.zookeeperKeyStoreLocation = zookeeperKeyStoreLocation;
+ }
+
+ public void setZookeeperKeyStorePassword(String zookeeperKeyStorePassword) {
+ this.zookeeperKeyStorePassword = zookeeperKeyStorePassword;
+ }
+
+ public void setZookeeperTrustStoreLocation(String zookeeperTrustStoreLocation) {
+ this.zookeeperTrustStoreLocation = zookeeperTrustStoreLocation;
+ }
+
+ public void setZookeeperTrustStorePassword(String zookeeperTrustStorePassword) {
+ this.zookeeperTrustStorePassword = zookeeperTrustStorePassword;
+ }
+
public void setCurrentHostZnodePath(String currentHostZnodePath) {
this.currentHostZnodePath = currentHostZnodePath;
}
@@ -485,6 +540,7 @@ public class Utils {
uri = uri.replace(dummyAuthorityString, authorityStr);
// Set ZooKeeper ensemble in connParams for later use
connParams.setZooKeeperEnsemble(authorityStr);
+ ZooKeeperHiveClientHelper.setZkSSLParams(connParams);
} else {
URI jdbcBaseURI = URI.create(URI_HIVE_PREFIX + "//" + authorityStr);
// Check to prevent unintentional use of embedded mode. A missing "/"
@@ -576,7 +632,6 @@ public class Utils {
* host:port pairs.
*
* @param uri
- * @param connParams
* @return
* @throws JdbcUriParseException
*/
diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
index 759ba8a..3d89fa2 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
@@ -32,6 +32,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.SSLZookeeperFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
@@ -85,11 +86,40 @@ class ZooKeeperHiveClientHelper {
JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER_HA.equalsIgnoreCase(discoveryMode));
}
+ /**
+ * Parse and set up the SSL communication related Zookeeper params in connParams from sessionVars.
+ * @param connParams
+ */
+ public static void setZkSSLParams(JdbcConnectionParams connParams) {
+ Map<String, String> sessionConf = connParams.getSessionVars();
+ boolean sslEnabled = false;
+ if (sessionConf.containsKey(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE)) {
+ sslEnabled = Boolean.parseBoolean(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_SSL_ENABLE));
+ connParams.setZooKeeperSslEnabled(sslEnabled);
+ }
+ if (sslEnabled) {
+ connParams.setZookeeperKeyStoreLocation(
+ StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_LOCATION), ""));
+ connParams.setZookeeperKeyStorePassword(
+ StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_KEYSTORE_PASSWORD), ""));
+ connParams.setZookeeperTrustStoreLocation(
+ StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_LOCATION), ""));
+ connParams.setZookeeperTrustStorePassword(
+ StringUtils.defaultString(sessionConf.get(JdbcConnectionParams.ZOOKEEPER_TRUSTSTORE_PASSWORD), ""));
+ }
+ }
+
private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception {
String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
CuratorFramework zooKeeperClient =
- CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ CuratorFrameworkFactory.builder()
+ .connectString(zooKeeperEnsemble)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .zookeeperFactory(
+ new SSLZookeeperFactory(connParams.isZooKeeperSslEnabled(), connParams.getZookeeperKeyStoreLocation(),
+ connParams.getZookeeperKeyStorePassword(), connParams.getZookeeperTrustStoreLocation(),
+ connParams.getZookeeperTrustStorePassword()))
+ .build();
zooKeeperClient.start();
return zooKeeperClient;
}
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index d28fd17..2b21baa 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -30,20 +30,18 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapUtil;
@@ -55,6 +53,7 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMars
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.InvalidACLException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
@@ -111,7 +110,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
private final Map<String, Set<InstanceType>> nodeToInstanceCache;
// The registration znode.
- private PersistentEphemeralNode znode;
+ private PersistentNode znode;
private String znodePath; // unique identity for this instance
private PathChildrenCache instancesCache; // Created on demand.
@@ -212,28 +211,23 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
}
private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
- String zkEnsemble = getQuorumServers(conf);
- int sessionTimeout = (int) HiveConf.getTimeVar(conf,
- ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
- int connectionTimeout = (int) HiveConf.getTimeVar(conf,
- ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
- int baseSleepTime = (int) HiveConf.getTimeVar(conf,
- ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
- int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
-
- LOG.info("Creating curator client with connectString: {} sessionTimeoutMs: {} connectionTimeoutMs: {}" +
- " namespace: {} exponentialBackoff - sleepTime: {} maxRetries: {}", zkEnsemble, sessionTimeout,
- connectionTimeout, namespace, baseSleepTime, maxRetries);
- // Create a CuratorFramework instance to be used as the ZooKeeper client
- // Use the zooKeeperAclProvider to create appropriate ACLs
- return CuratorFrameworkFactory.builder()
- .connectString(zkEnsemble)
- .sessionTimeoutMs(sessionTimeout)
- .connectionTimeoutMs(connectionTimeout)
- .aclProvider(zooKeeperAclProvider)
- .namespace(namespace)
- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
- .build();
+ return ZooKeeperHiveHelper.builder()
+ .quorum(conf.get(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname))
+ .clientPort(conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
+ ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()))
+ .connectionTimeout(
+ (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
+ .sessionTimeout(
+ (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS))
+ .baseSleepTime(
+ (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS))
+ .maxRetries(HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES))
+ .sslEnabled(HiveConf.getBoolVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE))
+ .keyStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
+ .keyStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
+ .trustStoreLocation(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
+ .trustStorePassword(HiveConf.getVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD))
+ .build().getNewZookeeperClient(zooKeeperAclProvider, namespace);
}
private static List<ACL> createSecureAcls() {
@@ -283,9 +277,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
// Create a znode under the rootNamespace parent for this instance of the server
try {
- // PersistentEphemeralNode will make sure the ephemeral node created on server will be present
+ // PersistentNode will make sure the ephemeral node created on server will be present
// even under connection or session interruption (will automatically handle retries)
- znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
+ znode = new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false,
workersPath + "/" + workerNodePrefix, encoder.toBytes(srv));
// start the creation of znodes
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
index fa3a382..e8eaac0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
@@ -18,14 +18,11 @@
package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
-import java.util.concurrent.TimeUnit;
import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hive.conf.HiveConf;
public class CuratorFrameworkSingleton {
@@ -50,15 +47,8 @@ public class CuratorFrameworkSingleton {
} else {
conf = hiveConf;
}
- int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
- int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
- int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
- String quorumServers = conf.getZKConfig().getQuorumServers();
- sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers)
- .sessionTimeoutMs(sessionTimeout)
- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
- .build();
+ sharedClient = conf.getZKConfig().getNewZookeeperClient();
sharedClient.start();
}
diff --git a/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java b/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
index eec6282..dc11ae1 100644
--- a/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
+++ b/ql/src/test/org/apache/hive/testutils/MiniZooKeeperCluster.java
@@ -34,7 +34,9 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.X509Util;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.slf4j.Logger;
@@ -54,6 +56,9 @@ public class MiniZooKeeperCluster {
private static final int TICK_TIME = 2000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
+ private static final String LOCALHOST_KEY_STORE_NAME = "keystore.jks";
+ private static final String TRUST_STORE_NAME = "truststore.jks";
+ private static final String KEY_STORE_TRUST_STORE_PASSWORD = "HiveJdbc";
private int connectionTimeout;
private boolean started;
@@ -61,7 +66,7 @@ public class MiniZooKeeperCluster {
/** The default port. If zero, we use a random port. */
private int defaultClientPort = 0;
- private List<NIOServerCnxnFactory> standaloneServerFactoryList;
+ private List<ServerCnxnFactory> standaloneServerFactoryList;
private List<ZooKeeperServer> zooKeeperServers;
private List<Integer> clientPortList;
@@ -70,11 +75,20 @@ public class MiniZooKeeperCluster {
private Configuration configuration;
+ private boolean sslEnabled = false;
+
public MiniZooKeeperCluster() {
this(new Configuration());
}
-
+ public MiniZooKeeperCluster(boolean sslEnabled) {
+ this(new Configuration(), sslEnabled);
+ }
public MiniZooKeeperCluster(Configuration configuration) {
+ this(configuration, false);
+ }
+
+ public MiniZooKeeperCluster(Configuration configuration, boolean sslEnabled) {
+ this.sslEnabled = sslEnabled;
this.started = false;
this.configuration = configuration;
activeZKServerIndex = -1;
@@ -167,7 +181,7 @@ public class MiniZooKeeperCluster {
}
// / XXX: From o.a.zk.t.ClientBase
- private static void setupTestEnv() {
+ private static void setupTestEnv(boolean sslEnabled) {
// With ZooKeeper 3.5 we need to whitelist the 4 letter commands we use
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
@@ -176,6 +190,9 @@ public class MiniZooKeeperCluster {
// resulting in test failure (client timeout on first session).
// set env and directly in order to handle static init/gc issues
System.setProperty("zookeeper.preAllocSize", "100");
+ if (sslEnabled) {
+ System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
+ }
FileTxnLog.setPreallocSize(100 * 1024);
}
@@ -200,7 +217,7 @@ public class MiniZooKeeperCluster {
return -1;
}
- setupTestEnv();
+ setupTestEnv(sslEnabled);
shutdown();
int tentativePort = -1; // the seed port
@@ -229,12 +246,10 @@ public class MiniZooKeeperCluster {
// Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
- NIOServerCnxnFactory standaloneServerFactory;
+ ServerCnxnFactory standaloneServerFactory;
while (true) {
try {
- standaloneServerFactory = new NIOServerCnxnFactory();
- standaloneServerFactory.configure(new InetSocketAddress(currentClientPort),
- configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
+ standaloneServerFactory = createServerCnxnFactory(currentClientPort);
} catch (BindException e) {
LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
// We're told to use some port but it's occupied, fail
@@ -252,7 +267,7 @@ public class MiniZooKeeperCluster {
// Start up this ZK server
standaloneServerFactory.startup(server);
// Runs a 'stat' against the servers.
- if (!waitForServerUp(currentClientPort, connectionTimeout)) {
+ if (!sslEnabled && !waitForServerUp(currentClientPort, connectionTimeout)) {
throw new IOException("Waiting for startup of standalone server");
}
@@ -276,6 +291,36 @@ public class MiniZooKeeperCluster {
return clientPort;
}
+ private ServerCnxnFactory createServerCnxnFactory(int currentClientPort) throws IOException {
+ ServerCnxnFactory serverCnxnFactory = null;
+ if (sslEnabled) {
+ System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
+ "org.apache.zookeeper.server.NettyServerCnxnFactory");
+ String dataFileDir = !System.getProperty("test.data.files", "").isEmpty() ?
+ System.getProperty("test.data.files") :
+ configuration.get("test.data.files").replace('\\', '/').replace("c:", "");
+ X509Util x509Util = new ClientX509Util();
+ System.setProperty(x509Util.getSslKeystoreLocationProperty(),
+ dataFileDir + File.separator + LOCALHOST_KEY_STORE_NAME);
+ System.setProperty(x509Util.getSslKeystorePasswdProperty(),
+ KEY_STORE_TRUST_STORE_PASSWORD);
+ System.setProperty(x509Util.getSslTruststoreLocationProperty(),
+ dataFileDir + File.separator + TRUST_STORE_NAME);
+ System.setProperty(x509Util.getSslTruststorePasswdProperty(),
+ KEY_STORE_TRUST_STORE_PASSWORD);
+ serverCnxnFactory = ServerCnxnFactory.createFactory();
+ serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
+ configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS),
+ true);
+ } else {
+ serverCnxnFactory = ServerCnxnFactory.createFactory();
+ serverCnxnFactory.configure(new InetSocketAddress(currentClientPort),
+ configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
+ }
+ return serverCnxnFactory;
+ }
+
+
private void createDir(File dir) throws IOException {
try {
if (!dir.exists()) {
@@ -292,7 +337,7 @@ public class MiniZooKeeperCluster {
public void shutdown() throws IOException {
// shut down all the zk servers
for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
- NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
+ ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
int clientPort = clientPortList.get(i);
standaloneServerFactory.shutdown();
@@ -305,6 +350,7 @@ public class MiniZooKeeperCluster {
for (ZooKeeperServer zkServer : zooKeeperServers) {
//explicitly close ZKDatabase since ZookeeperServer does not close them
zkServer.getZKDatabase().close();
+ zkServer.shutdown(true);
}
zooKeeperServers.clear();
@@ -328,7 +374,7 @@ public class MiniZooKeeperCluster {
}
// Shutdown the current active one
- NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
+ ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(activeZKServerIndex);
int clientPort = clientPortList.get(activeZKServerIndex);
standaloneServerFactory.shutdown();
@@ -366,7 +412,7 @@ public class MiniZooKeeperCluster {
int backupZKServerIndex = activeZKServerIndex + 1;
// Shutdown the current active one
- NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
+ ServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(backupZKServerIndex);
int clientPort = clientPortList.get(backupZKServerIndex);
standaloneServerFactory.shutdown();
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index fece82e..181ea5d 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -43,14 +43,12 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
@@ -1078,14 +1076,9 @@ public class HiveServer2 extends CompositeService {
*/
static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
HiveConf hiveConf = new HiveConf();
- String zooKeeperEnsemble = hiveConf.getZKConfig().getQuorumServers();
- String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
- int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
- int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
- CuratorFramework zooKeeperClient =
- CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
+ CuratorFramework zooKeeperClient = hiveConf.getZKConfig().getNewZookeeperClient();
zooKeeperClient.start();
+ String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
List<String> znodePaths =
zooKeeperClient.getChildren().forPath(
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
new file mode 100644
index 0000000..ee01731
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/SSLZookeeperFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory to create Zookeeper clients with the zookeeper.client.secure enabled,
+ * allowing SSL communication with the Zookeeper server.
+ */
+public class SSLZookeeperFactory implements ZookeeperFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SSLZookeeperFactory.class);
+
+ private boolean sslEnabled;
+ private String keyStoreLocation;
+ private String keyStorePassword;
+ private String trustStoreLocation;
+ private String trustStorePassword;
+
+ public SSLZookeeperFactory(boolean sslEnabled, String keyStoreLocation, String keyStorePassword,
+ String trustStoreLocation, String trustStorePassword) {
+
+ this.sslEnabled = sslEnabled;
+ this.keyStoreLocation = keyStoreLocation;
+ this.keyStorePassword = keyStorePassword;
+ this.trustStoreLocation = trustStoreLocation;
+ this.trustStorePassword = trustStorePassword;
+ if (sslEnabled) {
+ if (StringUtils.isEmpty(keyStoreLocation)) {
+ LOG.warn("Missing keystoreLocation parameter");
+ }
+ if (StringUtils.isEmpty(trustStoreLocation)) {
+ LOG.warn("Missing trustStoreLocation parameter");
+ }
+ }
+ }
+
+ @Override
+ public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
+ boolean canBeReadOnly) throws Exception {
+ if (!this.sslEnabled) {
+ return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
+ }
+ ZKClientConfig clientConfig = new ZKClientConfig();
+ clientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+ clientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
+ ClientX509Util x509Util = new ClientX509Util();
+ clientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(), this.keyStoreLocation);
+ clientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(), this.keyStorePassword);
+ clientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(), this.trustStoreLocation);
+ clientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(), this.trustStorePassword);
+ return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, clientConfig);
+ }
+}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
index 99f7c97..71d8651 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java
@@ -23,10 +23,11 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.List;
+
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -55,28 +56,164 @@ public class ZooKeeperHiveHelper {
public static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHiveHelper.class.getName());
public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
- private String quorum = null;
- private String clientPort = null;
- private String rootNamespace = null;
- private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
+ /**
+ * ZooKeeperHiveHelperBuilder. A builder class to initialize ZooKeeperHiveHelper.
+ */
+ public static class ZooKeeperHiveHelperBuilder {
+ private String quorum = null;
+ private String clientPort = null;
+ private String serverRegistryNameSpace = null;
+ private int connectionTimeout;
+ private int sessionTimeout;
+ private int baseSleepTime;
+ private int maxRetries;
+ private boolean sslEnabled = false;
+ private String keyStoreLocation = null;
+ private String keyStorePassword = null;
+ private String trustStoreLocation = null;
+ private String trustStorePassword = null;
+
+ public ZooKeeperHiveHelper build() {
+ return new ZooKeeperHiveHelper(this);
+ }
+
+ public ZooKeeperHiveHelperBuilder quorum(String quorum) {
+ this.quorum = quorum;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder clientPort(String clientPort) {
+ this.clientPort = clientPort;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder serverRegistryNameSpace(String serverRegistryNameSpace) {
+ this.serverRegistryNameSpace = serverRegistryNameSpace;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder connectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder sessionTimeout(int sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder baseSleepTime(int baseSleepTime) {
+ this.baseSleepTime = baseSleepTime;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder maxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder sslEnabled(boolean sslEnabled) {
+ this.sslEnabled = sslEnabled;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder keyStoreLocation(String keyStoreLocation) {
+ this.keyStoreLocation = keyStoreLocation;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder keyStorePassword(String keyStorePassword) {
+ this.keyStorePassword = keyStorePassword;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder trustStoreLocation(String trustStoreLocation) {
+ this.trustStoreLocation = trustStoreLocation;
+ return this;
+ }
+
+ public ZooKeeperHiveHelperBuilder trustStorePassword(String trustStorePassword) {
+ this.trustStorePassword = trustStorePassword;
+ return this;
+ }
+
+ public String getQuorum() {
+ return quorum;
+ }
+
+ public String getClientPort() {
+ return clientPort;
+ }
+
+ public String getServerRegistryNameSpace() {
+ return serverRegistryNameSpace;
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public int getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ public int getBaseSleepTime() {
+ return baseSleepTime;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public boolean isSslEnabled() {
+ return sslEnabled;
+ }
+
+ public String getKeyStoreLocation() {
+ return keyStoreLocation;
+ }
+
+ public String getKeyStorePassword() {
+ return keyStorePassword;
+ }
+
+ public String getTrustStoreLocation() {
+ return trustStoreLocation;
+ }
+
+ public String getTrustStorePassword() {
+ return trustStorePassword;
+ }
+ }
+
+ public static ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder builder() {
+ return new ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder();
+ }
+
+ private String quorum;
+ private String rootNamespace;
+ private int connectionTimeout;
private int sessionTimeout;
private int baseSleepTime;
private int maxRetries;
+ private boolean sslEnabled;
+ private SSLZookeeperFactory sslZookeeperFactory;
private CuratorFramework zooKeeperClient;
- private PersistentEphemeralNode znode;
+ private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
+ private PersistentNode znode;
+
- public ZooKeeperHiveHelper(String quorum, String clientPort, String rootNamespace,
- int sessionTimeout, int baseSleepTime, int maxRetries) {
+ public ZooKeeperHiveHelper(ZooKeeperHiveHelperBuilder builder) {
// Get the ensemble server addresses in the format host1:port1, host2:port2, ... . Append
// the configured port to hostname if the hostname doesn't contain a port.
- String[] hosts = quorum.split(",");
+ String[] hosts = builder.getQuorum().split(",");
StringBuilder quorumServers = new StringBuilder();
for (int i = 0; i < hosts.length; i++) {
quorumServers.append(hosts[i].trim());
if (!hosts[i].contains(":")) {
quorumServers.append(":");
- quorumServers.append(clientPort);
+ quorumServers.append(builder.getClientPort());
}
if (i != hosts.length - 1) {
@@ -85,11 +222,19 @@ public class ZooKeeperHiveHelper {
}
this.quorum = quorumServers.toString();
- this.clientPort = clientPort;
- this.rootNamespace = rootNamespace;
- this.sessionTimeout = sessionTimeout;
- this.baseSleepTime = baseSleepTime;
- this.maxRetries = maxRetries;
+ this.rootNamespace = builder.getServerRegistryNameSpace();
+ this.connectionTimeout = builder.getConnectionTimeout();
+ this.sessionTimeout = builder.getSessionTimeout();
+ this.baseSleepTime = builder.getBaseSleepTime();
+ this.maxRetries = builder.getMaxRetries();
+ this.sslEnabled = builder.isSslEnabled();
+ this.sslZookeeperFactory =
+ new SSLZookeeperFactory(sslEnabled,
+ builder.getKeyStoreLocation(),
+ builder.getKeyStorePassword(),
+ builder.getTrustStoreLocation(),
+ builder.getTrustStorePassword());
+
}
/**
@@ -118,8 +263,7 @@ public class ZooKeeperHiveHelper {
+ ZOOKEEPER_PATH_SEPARATOR + znodePathPrefix;
byte[] znodeDataUTF8 = znodeData.getBytes(StandardCharsets.UTF_8);
znode =
- new PersistentEphemeralNode(zooKeeperClient,
- PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
+ new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false, pathPrefix, znodeDataUTF8);
znode.start();
// We'll wait for 120s for node creation
long znodeCreationTimeout = 120;
@@ -147,17 +291,7 @@ public class ZooKeeperHiveHelper {
public CuratorFramework startZookeeperClient(ACLProvider zooKeeperAclProvider,
boolean addParentNode) throws Exception {
- String zooKeeperEnsemble = getQuorumServers();
- // Create a CuratorFramework instance to be used as the ZooKeeper client.
- // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .connectString(zooKeeperEnsemble)
- .sessionTimeoutMs(sessionTimeout)
- .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
- if (zooKeeperAclProvider != null) {
- builder = builder.aclProvider(zooKeeperAclProvider);
- }
- CuratorFramework zkClient = builder.build();
+ CuratorFramework zkClient = getNewZookeeperClient(zooKeeperAclProvider);
zkClient.start();
// Create the parent znodes recursively; ignore if the parent already exists.
@@ -177,6 +311,39 @@ public class ZooKeeperHiveHelper {
}
return zkClient;
}
+ public CuratorFramework getNewZookeeperClient() {
+ return getNewZookeeperClient(null, null);
+ }
+ public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider) {
+ return getNewZookeeperClient(zooKeeperAclProvider, null);
+ }
+
+ public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider, String nameSpace) {
+ LOG.info("Creating curator client with connectString: {} namespace: {} sessionTimeoutMs: {}" +
+ " connectionTimeoutMs: {} exponentialBackoff - sleepTime: {} maxRetries: {} sslEnabled: {}",
+ quorum, nameSpace, sessionTimeout,
+ connectionTimeout, baseSleepTime, maxRetries, sslEnabled);
+ // Create a CuratorFramework instance to be used as the ZooKeeper client.
+ // Use the zooKeeperAclProvider, when specified, to create appropriate ACLs.
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+ .connectString(quorum)
+ .namespace(nameSpace)
+ .zookeeperFactory(this.sslZookeeperFactory);
+ if (connectionTimeout > 0) {
+ builder = builder.connectionTimeoutMs(connectionTimeout);
+ }
+ if (sessionTimeout > 0) {
+ builder = builder.sessionTimeoutMs(sessionTimeout);
+ }
+ if (maxRetries > 0) {
+ builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries));
+ }
+ if (zooKeeperAclProvider != null) {
+ builder = builder.aclProvider(zooKeeperAclProvider);
+ }
+
+ return builder.build();
+ }
public void removeServerInstanceFromZooKeeper() throws Exception {
setDeregisteredWithZooKeeper(true);
@@ -185,7 +352,9 @@ public class ZooKeeperHiveHelper {
znode.close();
znode = null;
}
- zooKeeperClient.close();
+ if (zooKeeperClient != null) {
+ zooKeeperClient.close();
+ }
LOG.info("Server instance removed from ZooKeeper.");
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index fc6a2fd..d6a6c96 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -258,7 +258,11 @@ public class MetastoreConf {
ConfVars.SSL_TRUSTSTORE_PASSWORD.varname,
ConfVars.SSL_TRUSTSTORE_PASSWORD.hiveName,
ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.varname,
- ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName
+ ConfVars.DBACCESS_SSL_TRUSTSTORE_PASSWORD.hiveName,
+ ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.varname,
+ ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD.hiveName,
+ ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.varname,
+ ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD.hiveName
);
public static ConfVars getMetaConf(String name) {
@@ -1072,32 +1076,56 @@ public class MetastoreConf {
"If ZooKeeper is configured for Kerberos authentication. This could be useful when cluster\n" +
"is kerberized, but Zookeeper is not."),
THRIFT_ZOOKEEPER_CLIENT_PORT("metastore.zookeeper.client.port",
- "hive.metastore.zookeeper.client.port", "2181",
+ "hive.zookeeper.client.port", "2181",
"The port of ZooKeeper servers to talk to.\n" +
"If the list of Zookeeper servers specified in hive.metastore.thrift.uris" +
" does not contain port numbers, this value is used."),
THRIFT_ZOOKEEPER_SESSION_TIMEOUT("metastore.zookeeper.session.timeout",
- "hive.metastore.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
+ "hive.zookeeper.session.timeout", 120000L, TimeUnit.MILLISECONDS,
new TimeValidator(TimeUnit.MILLISECONDS),
"ZooKeeper client's session timeout (in milliseconds). The client is disconnected\n" +
"if a heartbeat is not sent in the timeout."),
THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT("metastore.zookeeper.connection.timeout",
- "hive.metastore.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
+ "hive.zookeeper.connection.timeout", 15L, TimeUnit.SECONDS,
new TimeValidator(TimeUnit.SECONDS),
"ZooKeeper client's connection timeout in seconds. " +
"Connection timeout * hive.metastore.zookeeper.connection.max.retries\n" +
"with exponential backoff is when curator client deems connection is lost to zookeeper."),
THRIFT_ZOOKEEPER_NAMESPACE("metastore.zookeeper.namespace",
- "hive.metastore.zookeeper.namespace", "hive_metastore",
+ "hive.zookeeper.namespace", "hive_metastore",
"The parent node under which all ZooKeeper nodes for metastores are created."),
THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES("metastore.zookeeper.connection.max.retries",
- "hive.metastore.zookeeper.connection.max.retries", 3,
+ "hive.zookeeper.connection.max.retries", 3,
"Max number of times to retry when connecting to the ZooKeeper server."),
THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME("metastore.zookeeper.connection.basesleeptime",
- "hive.metastore.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
+ "hive.zookeeper.connection.basesleeptime", 1000L, TimeUnit.MILLISECONDS,
new TimeValidator(TimeUnit.MILLISECONDS),
"Initial amount of time (in milliseconds) to wait between retries\n" +
"when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
+ THRIFT_ZOOKEEPER_SSL_ENABLE("metastore.zookeeper.ssl.client.enable",
+ "hive.zookeeper.ssl.client.enable", false,
+ "Set client to use TLS when connecting to ZooKeeper. An explicit value overrides any value set via the " +
+ "zookeeper.client.secure system property (note the different name). Defaults to false if neither is set."),
+ THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION("metastore.zookeeper.ssl.keystore.location",
+ "hive.zookeeper.ssl.keystore.location", "",
+ "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
+ "Overrides any explicit value set via the zookeeper.ssl.keyStore.location " +
+ "system property (note the camelCase)."),
+ THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD("metastore.zookeeper.ssl.keystore.password",
+ "hive.zookeeper.ssl.keystore.password", "",
+ "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+ "Overrides any explicit value set via the zookeeper.ssl.keyStore.password" +
+ "system property (note the camelCase)."),
+ THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION("metastore.zookeeper.ssl.truststore.location",
+ "hive.zookeeper.ssl.truststore.location", "",
+ "Truststore location when using a client-side certificate with TLS connectivity to ZooKeeper. " +
+ "Overrides any explicit value set via the zookeeper.ssl.trustStore.location " +
+ "system property (note the camelCase)."),
+ THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD("metastore.zookeeper.ssl.truststore.password",
+ "hive.zookeeper.ssl.truststore.password", "",
+ "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+ "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " +
+ "system property (note the camelCase)."),
THRIFT_URI_SELECTION("metastore.thrift.uri.selection", "hive.metastore.uri.selection", "RANDOM",
new StringSetValidator("RANDOM", "SEQUENTIAL"),
"Determines the selection mechanism used by metastore client to connect to remote " +
@@ -2018,14 +2046,22 @@ public class MetastoreConf {
}
public static ZooKeeperHiveHelper getZKConfig(Configuration conf) {
- return new ZooKeeperHiveHelper(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS),
- MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT),
- MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE),
- (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
- TimeUnit.MILLISECONDS),
- (int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
- TimeUnit.MILLISECONDS),
- MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES));
+ return ZooKeeperHiveHelper.builder()
+ .quorum(MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS))
+ .clientPort(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT))
+ .serverRegistryNameSpace(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_NAMESPACE))
+ .connectionTimeout((int) getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS))
+ .sessionTimeout((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT,
+ TimeUnit.MILLISECONDS))
+ .baseSleepTime((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+ TimeUnit.MILLISECONDS))
+ .maxRetries(MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES))
+ .sslEnabled(MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_ENABLE))
+ .keyStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION))
+ .keyStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD))
+ .trustStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION))
+ .trustStorePassword(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD)).build();
}
/**
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
index b35dc7c..239bff6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
@@ -46,6 +46,16 @@ public class MetastoreDelegationTokenManager {
public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
"hive.cluster.delegation.token.store.zookeeper.acl";
public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hivedelegation";
+ public static final String DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE =
+ "hive.cluster.delegation.token.store.zookeeper.ssl.client.enable";
+ public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION =
+ "hive.cluster.delegation.token.store.zookeeper.keystore.location";
+ public static final String DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD =
+ "hive.cluster.delegation.token.store.zookeeper.keystore.password";
+ public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION =
+ "hive.cluster.delegation.token.store.zookeeper.truststore.location";
+ public static final String DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD =
+ "hive.cluster.delegation.token.store.zookeeper.truststore.password";
public MetastoreDelegationTokenManager() {
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
index af52fcc..dd2af7e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
@@ -29,8 +29,8 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -57,12 +57,22 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
protected static final String ZK_SEQ_FORMAT = "%010d";
private static final String NODE_KEYS = "/keys";
private static final String NODE_TOKENS = "/tokens";
+ private static final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
+ + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
private String rootNode = "";
private volatile CuratorFramework zkSession;
private String zkConnectString;
private int connectTimeoutMillis;
+ private boolean sslEnabled;
+ private String keyStoreLocation;
+ private String keyStorePassword;
+ private String trustStoreLocation;
+ private String trustStorePassword;
+
private List<ACL> newNodeAcl;
+ private Configuration conf;
+ private HadoopThriftAuthBridge.Server.ServerMode serverMode;
/**
* ACLProvider permissions will be used in case parent dirs need to be created
@@ -110,12 +120,7 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
}
}
- private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
- + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
-
- private Configuration conf;
- private HadoopThriftAuthBridge.Server.ServerMode serverMode;
/**
* Default constructor for dynamic instantiation w/ Configurable
@@ -124,14 +129,22 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
protected ZooKeeperTokenStore() {
}
- private CuratorFramework getSession() {
+ public CuratorFramework getSession() {
if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
synchronized (this) {
if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
- zkSession =
- CuratorFrameworkFactory.builder().connectString(zkConnectString)
- .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ ZooKeeperHiveHelper zkHelper = ZooKeeperHiveHelper.builder()
+ .quorum(zkConnectString)
+ .connectionTimeout(connectTimeoutMillis)
+ .maxRetries(3)
+ .baseSleepTime(1000)
+ .sslEnabled(sslEnabled)
+ .keyStoreLocation(keyStoreLocation)
+ .keyStorePassword(keyStorePassword)
+ .trustStoreLocation(trustStoreLocation)
+ .trustStorePassword(trustStorePassword)
+ .build();
+ zkSession = zkHelper.getNewZookeeperClient(aclDefaultProvider);
zkSession.start();
}
}
@@ -478,10 +491,14 @@ public class ZooKeeperTokenStore implements DelegationTokenStore {
+ WHEN_ZK_DSTORE_MSG);
}
}
- connectTimeoutMillis =
- conf.getInt(
- MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
- CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
+ connectTimeoutMillis = conf.getInt(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+ CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
+
+ sslEnabled = conf.getBoolean(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_SSL_ENABLE, false);
+ keyStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_LOCATION, "");
+ keyStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_KEYSTORE_PASSWORD, "");
+ trustStoreLocation = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_LOCATION, "");
+ trustStorePassword = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_TRUSTSTORE_PASSWORD, "");
String aclStr = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null);
this.newNodeAcl = StringUtils.isNotBlank(aclStr)? parseACLs(aclStr) : getDefaultAcl(conf);