You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/25 23:29:18 UTC
[48/50] [abbrv] hive git commit: HIVE-11581: HiveServer2 should store
connection params in ZK when using dynamic service discovery for simpler
client connection string (Vaibhav Gumashta reviewed by Thejas Nair)
HIVE-11581: HiveServer2 should store connection params in ZK when using dynamic service discovery for simpler client connection string (Vaibhav Gumashta reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e54991d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e54991d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e54991d
Branch: refs/heads/llap
Commit: 0e54991d897c9acc26b015b6df82b44c0c90c6fb
Parents: dd2bdfc
Author: Vaibhav Gumashta <vg...@apache.org>
Authored: Mon Aug 24 17:14:27 2015 -0700
Committer: Vaibhav Gumashta <vg...@apache.org>
Committed: Mon Aug 24 17:15:22 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 72 ++++++------
.../org/apache/hive/jdbc/HiveConnection.java | 4 +-
jdbc/src/java/org/apache/hive/jdbc/Utils.java | 117 +++++++------------
.../hive/jdbc/ZooKeeperHiveClientHelper.java | 104 ++++++++++++++---
.../apache/hive/service/server/HiveServer2.java | 74 +++++++++++-
5 files changed, 239 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index da171b1..8706a2d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1683,22 +1683,6 @@ public class HiveConf extends Configuration {
"to construct a list exception handlers to handle exceptions thrown\n" +
"by record readers"),
- // operation log configuration
- HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true,
- "When true, HS2 will save operation logs and make them available for clients"),
- HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION("hive.server2.logging.operation.log.location",
- "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator +
- "operation_logs",
- "Top level directory where operation logs are stored if logging functionality is enabled"),
- HIVE_SERVER2_LOGGING_OPERATION_LEVEL("hive.server2.logging.operation.level", "EXECUTION",
- new StringSet("NONE", "EXECUTION", "PERFORMANCE", "VERBOSE"),
- "HS2 operation logging mode available to clients to be set at session level.\n" +
- "For this to work, hive.server2.logging.operation.enabled should be set to true.\n" +
- " NONE: Ignore any logging\n" +
- " EXECUTION: Log completion of tasks\n" +
- " PERFORMANCE: Execution + Performance logs \n" +
- " VERBOSE: All logs" ),
- HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
// logging configuration
HIVE_LOG4J_FILE("hive.log4j.file", "",
"Hive log4j configuration file.\n" +
@@ -1790,6 +1774,7 @@ public class HiveConf extends Configuration {
"hive.zookeeper.quorum in their connection string."),
HIVE_SERVER2_ZOOKEEPER_NAMESPACE("hive.server2.zookeeper.namespace", "hiveserver2",
"The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."),
+
// HiveServer2 global init file location
HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
"Either the location of a HS2 global init file or a directory containing a .hiverc file. If the \n" +
@@ -1801,6 +1786,39 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" +
"enable parallel compilation between sessions on HiveServer2. The default is false."),
+ // Tez session settings
+ HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "",
+ "A list of comma separated values corresponding to YARN queues of the same name.\n" +
+ "When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" +
+ "for multiple Tez sessions to run in parallel on the cluster."),
+ HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE("hive.server2.tez.sessions.per.default.queue", 1,
+ "A positive integer that determines the number of Tez sessions that should be\n" +
+ "launched on each of the queues specified by \"hive.server2.tez.default.queues\".\n" +
+ "Determines the parallelism on each queue."),
+ HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS("hive.server2.tez.initialize.default.sessions", false,
+ "This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" +
+ "turning on Tez for HiveServer2. The user could potentially want to run queries\n" +
+ "over Tez without the pool of sessions."),
+
+ // Operation log configuration
+ HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true,
+ "When true, HS2 will save operation logs and make them available for clients"),
+ HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION("hive.server2.logging.operation.log.location",
+ "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator +
+ "operation_logs",
+ "Top level directory where operation logs are stored if logging functionality is enabled"),
+ HIVE_SERVER2_LOGGING_OPERATION_LEVEL("hive.server2.logging.operation.level", "EXECUTION",
+ new StringSet("NONE", "EXECUTION", "PERFORMANCE", "VERBOSE"),
+ "HS2 operation logging mode available to clients to be set at session level.\n" +
+ "For this to work, hive.server2.logging.operation.enabled should be set to true.\n" +
+ " NONE: Ignore any logging\n" +
+ " EXECUTION: Log completion of tasks\n" +
+ " PERFORMANCE: Execution + Performance logs \n" +
+ " VERBOSE: All logs" ),
+
+ // Enable metric collection for HiveServer2
+ HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
+
// http (over thrift) transport settings
HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001,
"Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'http'."),
@@ -1816,7 +1834,7 @@ public class HiveConf extends Configuration {
"Keepalive time for an idle http worker thread. When the number of workers exceeds min workers, " +
"excessive threads are killed after this time interval."),
- // Cookie based authentication
+ // Cookie based authentication when using HTTP Transport
HIVE_SERVER2_THRIFT_HTTP_COOKIE_AUTH_ENABLED("hive.server2.thrift.http.cookie.auth.enabled", true,
"When true, HiveServer2 in HTTP transport mode, will use cookie based authentication mechanism."),
HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE("hive.server2.thrift.http.cookie.max.age", "86400s",
@@ -1963,6 +1981,8 @@ public class HiveConf extends Configuration {
" HIVE : Exposes Hive's native table types like MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW\n" +
" CLASSIC : More generic types like TABLE and VIEW"),
HIVE_SERVER2_SESSION_HOOK("hive.server2.session.hook", "", ""),
+
+ // SSL settings
HIVE_SERVER2_USE_SSL("hive.server2.use.SSL", false,
"Set this to true for using SSL encryption in HiveServer2."),
HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", "",
@@ -1983,9 +2003,6 @@ public class HiveConf extends Configuration {
"Comma separated list of udfs names. These udfs will not be allowed in queries." +
" The udf black list takes precedence over udf white list"),
- HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
- "Comma separated list of non-SQL Hive commands users are authorized to execute"),
-
HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h",
new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),
"The check interval for session/operation timeout, which can be disabled by setting to zero or negative value."),
@@ -2002,6 +2019,8 @@ public class HiveConf extends Configuration {
" This setting takes effect only if session idle timeout (hive.server2.idle.session.timeout) and checking\n" +
"(hive.server2.session.check.interval) are enabled."),
+ HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
+ "Comma separated list of non-SQL Hive commands users are authorized to execute"),
HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
"hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role",
"Comma separated list of configuration options which are immutable at runtime"),
@@ -2127,19 +2146,6 @@ public class HiveConf extends Configuration {
HIVECOUNTERGROUP("hive.counters.group.name", "HIVE",
"The name of counter group for internal Hive variables (CREATED_FILE, FATAL_ERROR, etc.)"),
- HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "",
- "A list of comma separated values corresponding to YARN queues of the same name.\n" +
- "When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" +
- "for multiple Tez sessions to run in parallel on the cluster."),
- HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE("hive.server2.tez.sessions.per.default.queue", 1,
- "A positive integer that determines the number of Tez sessions that should be\n" +
- "launched on each of the queues specified by \"hive.server2.tez.default.queues\".\n" +
- "Determines the parallelism on each queue."),
- HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS("hive.server2.tez.initialize.default.sessions", false,
- "This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" +
- "turning on Tez for HiveServer2. The user could potentially want to run queries\n" +
- "over Tez without the pool of sessions."),
-
HIVE_QUOTEDID_SUPPORT("hive.support.quoted.identifiers", "column",
new StringSet("none", "column"),
"Whether to use quoted identifier. 'none' or 'column' can be used. \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index a9dac03..ba971fd 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -211,13 +211,13 @@ public class HiveConnection implements java.sql.Connection {
break;
} catch (TTransportException e) {
LOG.info("Could not open client transport with JDBC Uri: " + jdbcUriString);
- // We'll retry till we exhaust all HiveServer2 uris from ZooKeeper
+ // We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper
if ((sessConfMap.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE) != null)
&& (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(sessConfMap
.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE)))) {
try {
// Update jdbcUriString, host & port variables in connParams
- // Throw an exception if all HiveServer2 uris have been exhausted,
+ // Throw an exception if all HiveServer2 nodes have been exhausted,
// or if we're unable to connect to ZooKeeper.
Utils.updateConnParamsFromZooKeeper(connParams);
} catch (ZooKeeperHiveClientException ze) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/Utils.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 0e4693b..d8368a4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -19,7 +19,6 @@
package org.apache.hive.jdbc;
import java.net.URI;
-import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,22 +36,22 @@ import org.apache.hive.service.cli.thrift.TStatusCode;
import org.apache.http.client.CookieStore;
import org.apache.http.cookie.Cookie;
-public class Utils {
- public static final Log LOG = LogFactory.getLog(Utils.class.getName());
+class Utils {
+ static final Log LOG = LogFactory.getLog(Utils.class.getName());
/**
* The required prefix for the connection URL.
*/
- public static final String URL_PREFIX = "jdbc:hive2://";
+ static final String URL_PREFIX = "jdbc:hive2://";
/**
* If host is provided, without a port.
*/
- public static final String DEFAULT_PORT = "10000";
+ static final String DEFAULT_PORT = "10000";
/**
* Hive's default database name
*/
- public static final String DEFAULT_DATABASE = "default";
+ static final String DEFAULT_DATABASE = "default";
private static final String URI_JDBC_PREFIX = "jdbc:";
@@ -63,7 +62,7 @@ public class Utils {
static final String HIVE_SERVER2_RETRY_TRUE = "true";
static final String HIVE_SERVER2_RETRY_FALSE = "false";
- public static class JdbcConnectionParams {
+ static class JdbcConnectionParams {
// Note on client side parameter naming convention:
// Prefer using a shorter camelCase param name instead of using the same name as the
// corresponding
@@ -129,7 +128,7 @@ public class Utils {
static final String SSL_TRUST_STORE_TYPE = "JKS";
private String host = null;
- private int port;
+ private int port = 0;
private String jdbcUriString;
private String dbName = DEFAULT_DATABASE;
private Map<String,String> hiveConfs = new LinkedHashMap<String,String>();
@@ -238,17 +237,17 @@ public class Utils {
}
// Verify success or success_with_info status, else throw SQLException
- public static void verifySuccessWithInfo(TStatus status) throws SQLException {
+ static void verifySuccessWithInfo(TStatus status) throws SQLException {
verifySuccess(status, true);
}
// Verify success status, else throw SQLException
- public static void verifySuccess(TStatus status) throws SQLException {
+ static void verifySuccess(TStatus status) throws SQLException {
verifySuccess(status, false);
}
// Verify success and optionally with_info status, else throw SQLException
- public static void verifySuccess(TStatus status, boolean withInfo) throws SQLException {
+ static void verifySuccess(TStatus status, boolean withInfo) throws SQLException {
if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS ||
(withInfo && status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS)) {
return;
@@ -279,7 +278,7 @@ public class Utils {
* @return
* @throws SQLException
*/
- public static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException,
+ static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException,
SQLException, ZooKeeperHiveClientException {
JdbcConnectionParams connParams = new JdbcConnectionParams();
@@ -383,7 +382,6 @@ public class Utils {
newUsage = usageUrlBase + JdbcConnectionParams.HTTP_PATH + "=<http_path_value>";
handleParamDeprecation(connParams.getHiveConfs(), connParams.getSessionVars(),
JdbcConnectionParams.HTTP_PATH_DEPRECATED, JdbcConnectionParams.HTTP_PATH, newUsage);
-
// Extract host, port
if (connParams.isEmbeddedMode()) {
// In case of embedded mode we were supplied with an empty authority.
@@ -391,23 +389,15 @@ public class Utils {
connParams.setHost(jdbcURI.getHost());
connParams.setPort(jdbcURI.getPort());
} else {
- // Else substitute the dummy authority with a resolved one.
- // In case of dynamic service discovery using ZooKeeper, it picks a server uri from ZooKeeper
- String resolvedAuthorityString = resolveAuthority(connParams);
- LOG.info("Resolved authority: " + resolvedAuthorityString);
- uri = uri.replace(dummyAuthorityString, resolvedAuthorityString);
+ // Configure host, port and params from ZooKeeper if used,
+ // and substitute the dummy authority with a resolved one
+ configureConnParams(connParams);
+ // We check for invalid host, port while configuring connParams with configureConnParams()
+ String authorityStr = connParams.getHost() + ":" + connParams.getPort();
+ LOG.info("Resolved authority: " + authorityStr);
+ uri = uri.replace(dummyAuthorityString, authorityStr);
connParams.setJdbcUriString(uri);
- // Create a Java URI from the resolved URI for extracting the host/port
- URI resolvedAuthorityURI = null;
- try {
- resolvedAuthorityURI = new URI(null, resolvedAuthorityString, null, null, null);
- } catch (URISyntaxException e) {
- throw new JdbcUriParseException("Bad URL format: ", e);
- }
- connParams.setHost(resolvedAuthorityURI.getHost());
- connParams.setPort(resolvedAuthorityURI.getPort());
}
-
return connParams;
}
@@ -471,22 +461,17 @@ public class Utils {
return authorities;
}
- /**
- * Get a string representing a specific host:port
- * @param connParams
- * @return
- * @throws JdbcUriParseException
- * @throws ZooKeeperHiveClientException
- */
- private static String resolveAuthority(JdbcConnectionParams connParams)
+ private static void configureConnParams(JdbcConnectionParams connParams)
throws JdbcUriParseException, ZooKeeperHiveClientException {
String serviceDiscoveryMode =
connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
if ((serviceDiscoveryMode != null)
&& (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER
.equalsIgnoreCase(serviceDiscoveryMode))) {
- // Resolve using ZooKeeper
- return resolveAuthorityUsingZooKeeper(connParams);
+ // Set ZooKeeper ensemble in connParams for later use
+ connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
+ // Configure using ZooKeeper
+ ZooKeeperHiveClientHelper.configureConnParams(connParams);
} else {
String authority = connParams.getAuthorityList()[0];
URI jdbcURI = URI.create(URI_HIVE_PREFIX + "//" + authority);
@@ -494,32 +479,28 @@ public class Utils {
// to separate the 'path' portion of URI can result in this.
// The missing "/" common typo while using secure mode, eg of such url -
// jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM
- if ((jdbcURI.getAuthority() != null) && (jdbcURI.getHost() == null)) {
- throw new JdbcUriParseException("Bad URL format. Hostname not found "
- + " in authority part of the url: " + jdbcURI.getAuthority()
- + ". Are you missing a '/' after the hostname ?");
+ if (jdbcURI.getAuthority() != null) {
+ String host = jdbcURI.getHost();
+ int port = jdbcURI.getPort();
+ if (host == null) {
+ throw new JdbcUriParseException("Bad URL format. Hostname not found "
+ + " in authority part of the url: " + jdbcURI.getAuthority()
+ + ". Are you missing a '/' after the hostname ?");
+ }
+ // Set the port to default value; we do support jdbc url like:
+ // jdbc:hive2://localhost/db
+ if (port <= 0) {
+ port = Integer.parseInt(Utils.DEFAULT_PORT);
+ }
+ connParams.setHost(jdbcURI.getHost());
+ connParams.setPort(jdbcURI.getPort());
}
- // Return the 1st element of the array
- return jdbcURI.getAuthority();
}
}
/**
- * Read a specific host:port from ZooKeeper
- * @param connParams
- * @return
- * @throws ZooKeeperHiveClientException
- */
- private static String resolveAuthorityUsingZooKeeper(JdbcConnectionParams connParams)
- throws ZooKeeperHiveClientException {
- // Set ZooKeeper ensemble in connParams for later use
- connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
- return ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
- }
-
- /**
* Read the next server coordinates (host:port combo) from ZooKeeper. Ignore the znodes already
- * explored. Also update the host, port, jdbcUriString fields of connParams.
+ * explored. Also update the host, port, jdbcUriString and other configs published by the server.
*
* @param connParams
* @throws ZooKeeperHiveClientException
@@ -528,25 +509,13 @@ public class Utils {
throws ZooKeeperHiveClientException {
// Add current host to the rejected list
connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath());
- // Get another HiveServer2 uri from ZooKeeper
- String serverUriString = ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
- // Parse serverUri to a java URI and extract host, port
- URI serverUri = null;
- try {
- // Note URL_PREFIX is not a valid scheme format, therefore leaving it null in the constructor
- // to construct a valid URI
- serverUri = new URI(null, serverUriString, null, null, null);
- } catch (URISyntaxException e) {
- throw new ZooKeeperHiveClientException(e);
- }
String oldServerHost = connParams.getHost();
int oldServerPort = connParams.getPort();
- String newServerHost = serverUri.getHost();
- int newServerPort = serverUri.getPort();
- connParams.setHost(newServerHost);
- connParams.setPort(newServerPort);
+ // Update connection params (including host, port) from ZooKeeper
+ ZooKeeperHiveClientHelper.configureConnParams(connParams);
connParams.setJdbcUriString(connParams.getJdbcUriString().replace(
- oldServerHost + ":" + oldServerPort, newServerHost + ":" + newServerPort));
+ oldServerHost + ":" + oldServerPort, connParams.getHost() + ":" + connParams.getPort()));
+ LOG.info("Selected HiveServer2 instance with uri: " + connParams.getJdbcUriString());
}
private static String joinStringArray(String[] stringArray, String seperator) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
index e24b3dc..eeb3cf9 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
@@ -19,9 +19,10 @@
package org.apache.hive.jdbc;
import java.nio.charset.Charset;
-import java.sql.SQLException;
import java.util.List;
import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,26 +32,19 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.apache.zookeeper.Watcher;
-public class ZooKeeperHiveClientHelper {
- public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
-
+class ZooKeeperHiveClientHelper {
+ static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
+ // Pattern for key1=value1;key2=value2
+ private static final Pattern kvPattern = Pattern.compile("([^=;]*)=([^;]*)[;]?");
/**
* A no-op watcher class
*/
- public static class DummyWatcher implements Watcher {
+ static class DummyWatcher implements Watcher {
public void process(org.apache.zookeeper.WatchedEvent event) {
}
}
- /**
- * Resolve to a host:port by connecting to ZooKeeper and picking a host randomly.
- *
- * @param uri
- * @param connParams
- * @return
- * @throws SQLException
- */
- static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams)
+ static void configureConnParams(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
String zooKeeperNamespace =
@@ -73,17 +67,17 @@ public class ZooKeeperHiveClientHelper {
throw new ZooKeeperHiveClientException(
"Tried all existing HiveServer2 uris from ZooKeeper.");
}
- // Now pick a host randomly
+ // Now pick a server node randomly
serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
connParams.setCurrentHostZnodePath(serverNode);
- String serverUri =
+ // Read config string from the znode for this server node
+ String serverConfStr =
new String(
zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode),
Charset.forName("UTF-8"));
- LOG.info("Selected HiveServer2 instance with uri: " + serverUri);
- return serverUri;
+ applyConfs(serverConfStr, connParams);
} catch (Exception e) {
- throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e);
+ throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e);
} finally {
// Close the client connection with ZooKeeper
if (zooKeeperClient != null) {
@@ -91,4 +85,76 @@ public class ZooKeeperHiveClientHelper {
}
}
}
+
+ /**
+ * Apply configs published by the server. Configs specified from client's JDBC URI override
+ * configs published by the server.
+ *
+ * @param serverConfStr
+ * @param connParams
+ * @throws Exception
+ */
+ private static void applyConfs(String serverConfStr, JdbcConnectionParams connParams)
+ throws Exception {
+ Matcher matcher = kvPattern.matcher(serverConfStr);
+ while (matcher.find()) {
+ // Have to use this if-else since switch-case on String is supported Java 7 onwards
+ if ((matcher.group(1) != null)) {
+ if ((matcher.group(2) == null)) {
+ throw new Exception("Null config value for: " + matcher.group(1)
+ + " published by the server.");
+ }
+ // Set host
+ if ((matcher.group(1).equals("hive.server2.thrift.bind.host"))
+ && (connParams.getHost() == null)) {
+ connParams.setHost(matcher.group(2));
+ }
+ // Set transportMode
+ if ((matcher.group(1).equals("hive.server2.transport.mode"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.TRANSPORT_MODE))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.TRANSPORT_MODE, matcher.group(2));
+ }
+ // Set port
+ if ((matcher.group(1).equals("hive.server2.thrift.port")) && !(connParams.getPort() > 0)) {
+ connParams.setPort(Integer.parseInt(matcher.group(2)));
+ }
+ if ((matcher.group(1).equals("hive.server2.thrift.http.port"))
+ && !(connParams.getPort() > 0)) {
+ connParams.setPort(Integer.parseInt(matcher.group(2)));
+ }
+ // Set sasl qop
+ if ((matcher.group(1).equals("hive.server2.thrift.sasl.qop"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_QOP))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.AUTH_QOP, matcher.group(2));
+ }
+ // Set http path
+ if ((matcher.group(1).equals("hive.server2.thrift.http.path"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.HTTP_PATH))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.HTTP_PATH, "/" + matcher.group(2));
+ }
+ // Set SSL
+ if ((matcher.group(1) != null) && (matcher.group(1).equals("hive.server2.use.SSL"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.USE_SSL))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.USE_SSL, matcher.group(2));
+ }
+ // Set authentication configs
+ // Note that in JDBC driver, we have 3 auth modes: NOSASL, Kerberos and password based
+ // The use of "JdbcConnectionParams.AUTH_TYPE=JdbcConnectionParams.AUTH_SIMPLE" picks NOSASL
+ // The presence of "JdbcConnectionParams.AUTH_PRINCIPAL=<principal>" picks Kerberos
+ // Otherwise password based (which includes NONE, PAM, LDAP, CUSTOM)
+ if ((matcher.group(1).equals("hive.server2.authentication"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_TYPE))) {
+ if (matcher.group(2).equalsIgnoreCase("NOSASL")) {
+ connParams.getSessionVars().put(JdbcConnectionParams.AUTH_TYPE,
+ JdbcConnectionParams.AUTH_SIMPLE);
+ }
+ }
+ // Set server's kerberos principal
+ if ((matcher.group(1).equals("hive.server2.authentication.kerberos.principal"))
+ && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_PRINCIPAL))) {
+ connParams.getSessionVars().put(JdbcConnectionParams.AUTH_PRINCIPAL, matcher.group(2));
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 4a4be97..d7ba964 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -21,7 +21,9 @@ package org.apache.hive.service.server;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -69,6 +71,8 @@ import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
+import com.google.common.base.Joiner;
+
/**
* HiveServer2.
*
@@ -100,7 +104,12 @@ public class HiveServer2 extends CompositeService {
}
addService(thriftCLIService);
super.init(hiveConf);
-
+ // Set host name in hiveConf
+ try {
+ hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getServerHost());
+ } catch (Throwable t) {
+ throw new Error("Unable to intitialize HiveServer2", t);
+ }
// Add a shutdown hook for catching SIGTERM & SIGINT
final HiveServer2 hiveServer2 = this;
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -122,6 +131,14 @@ public class HiveServer2 extends CompositeService {
return false;
}
+ public static boolean isKerberosAuthMode(HiveConf hiveConf) {
+ String authMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION);
+ if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) {
+ return true;
+ }
+ return false;
+ }
+
/**
* ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
*/
@@ -158,9 +175,12 @@ public class HiveServer2 extends CompositeService {
private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
- String instanceURI = getServerInstanceURI(hiveConf);
- byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
+ String instanceURI = getServerInstanceURI();
setUpZooKeeperAuth(hiveConf);
+ // HiveServer2 configs that this instance will publish to ZooKeeper,
+ // so that the clients can read these and configure themselves properly.
+ Map<String, String> confsToPublish = new HashMap<String, String>();
+ addConfsToPublish(hiveConf, confsToPublish);
int sessionTimeout =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
TimeUnit.MILLISECONDS);
@@ -193,6 +213,10 @@ public class HiveServer2 extends CompositeService {
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
+ String znodeData = "";
+ // Publish configs for this instance as the data on the node
+ znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
+ byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8"));
znode =
new PersistentEphemeralNode(zooKeeperClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
@@ -220,6 +244,41 @@ public class HiveServer2 extends CompositeService {
}
/**
+ * Add conf keys, values that HiveServer2 will publish to ZooKeeper.
+ * @param hiveConf
+ */
+ private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish) {
+ // Hostname
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST));
+ // Transport mode
+ confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE));
+ // Transport specific confs
+ if (isHTTPTransportMode(hiveConf)) {
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT));
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
+ } else {
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_PORT));
+ confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP));
+ }
+ // Auth specific confs
+ confsToPublish.put(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION));
+ if (isKerberosAuthMode(hiveConf)) {
+ confsToPublish.put(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
+ }
+ // SSL conf
+ confsToPublish.put(ConfVars.HIVE_SERVER2_USE_SSL.varname,
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_USE_SSL));
+ }
+
+ /**
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
* @param hiveConf
@@ -289,7 +348,7 @@ public class HiveServer2 extends CompositeService {
this.registeredWithZooKeeper = registeredWithZooKeeper;
}
- private String getServerInstanceURI(HiveConf hiveConf) throws Exception {
+ private String getServerInstanceURI() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
@@ -297,6 +356,13 @@ public class HiveServer2 extends CompositeService {
+ thriftCLIService.getPortNumber();
}
+ private String getServerHost() throws Exception {
+ if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
+ throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
+ }
+ return thriftCLIService.getServerIPAddress().getHostName();
+ }
+
@Override
public synchronized void start() {
super.start();