You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2014/09/16 21:40:52 UTC
svn commit: r1625363 [1/2] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
itests/hive-unit/src/test/java/org/apache/hive/beeline/
itests/hive-unit/src/test/java/org/apache/hive/jdbc/ jdbc/
jdbc/src/java/org/apache/hive/jdbc/ ql/src/java...
Author: vgumashta
Date: Tue Sep 16 19:40:51 2014
New Revision: 1625363
URL: http://svn.apache.org/r1625363
Log:
HIVE-7935: Support dynamic service discovery for HiveServer2 (Vaibhav Gumashta reviewed by Thejas Nair, Alan Gates)
Added:
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
hive/trunk/jdbc/pom.xml
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java
hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Sep 16 19:40:51 2014
@@ -205,7 +205,7 @@ public class HiveConf extends Configurat
PLAN_SERIALIZATION("hive.plan.serialization.format", "kryo",
"Query plan format serialization between client and task nodes. \n" +
"Two supported values are : kryo and javaXML. Kryo is default."),
- SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive",
+ SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive",
"HDFS root scratch dir for Hive jobs which gets created with 777 permission. " +
"For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, " +
"with ${hive.scratch.dir.permission}."),
@@ -215,7 +215,7 @@ public class HiveConf extends Configurat
DOWNLOADED_RESOURCES_DIR("hive.downloaded.resources.dir",
"${system:java.io.tmpdir}" + File.separator + "${hive.session.id}_resources",
"Temporary local directory for added resources in the remote file system."),
- SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700",
+ SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700",
"The permission for the user specific scratch directories that get created."),
SUBMITVIACHILD("hive.exec.submitviachild", false, ""),
SUBMITLOCALTASKVIACHILD("hive.exec.submit.local.task.via.child", true,
@@ -1243,10 +1243,16 @@ public class HiveConf extends Configurat
"This param is to control whether or not only do lock on queries\n" +
"that need to execute at least one mapred job."),
+ // Zookeeper related configs
HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "",
- "The list of ZooKeeper servers to talk to. This is only needed for read/write locks."),
+ "List of ZooKeeper servers to talk to. This is needed for: " +
+ "1. Read/write locks - when hive.lock.manager is set to " +
+ "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, " +
+ "2. When HiveServer2 supports service discovery via Zookeeper."),
HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181",
- "The port of ZooKeeper servers to talk to. This is only needed for read/write locks."),
+ "The port of ZooKeeper servers to talk to. " +
+ "If the list of Zookeeper servers specified in hive.zookeeper.quorum," +
+ "does not contain port numbers, this value is used."),
HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000,
"ZooKeeper client's session timeout. The client is disconnected, and as a result, all locks released, \n" +
"if a heartbeat is not sent in the timeout."),
@@ -1446,11 +1452,6 @@ public class HiveConf extends Configurat
"If the property is set, the value must be a valid URI (java.net.URI, e.g. \"file:///tmp/my-logging.properties\"), \n" +
"which you can then extract a URL from and pass to PropertyConfigurator.configure(URL)."),
- // Hive global init file location
- HIVE_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
- "The location of HS2 global init file (.hiverc).\n" +
- "If the property is reset, the value must be a valid path where the init file is located."),
-
// prefix used to auto generated column aliases (this should be started with '_')
HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL("hive.autogen.columnalias.prefix.label", "_c",
"String used as a prefix when auto generating column alias.\n" +
@@ -1489,16 +1490,29 @@ public class HiveConf extends Configurat
"table. From 0.12 onwards, they are displayed separately. This flag will let you\n" +
"get old behavior, if desired. See, test-case in patch for HIVE-6689."),
+ // HiveServer2 specific configs
HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, null),
- "This number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds between retries. \n" +
- "The default of 30 will keep trying for 30 minutes."),
-
+ "Number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds " +
+ "between retries. \n The default of 30 will keep trying for 30 minutes."),
+ HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY("hive.server2.support.dynamic.service.discovery", false,
+ "Whether HiveServer2 supports dynamic service discovery for its clients. " +
+ "To support this, each instance of HiveServer2 currently uses ZooKeeper to register itself, " +
+ "when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: " +
+ "hive.zookeeper.quorum in their connection string."),
+ HIVE_SERVER2_ZOOKEEPER_NAMESPACE("hive.server2.zookeeper.namespace", "hiveserver2",
+ "The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."),
+ // HiveServer2 global init file location
+ HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
+ "The location of HS2 global init file (.hiverc).\n" +
+ "If the property is reset, the value must be a valid path where the init file is located."),
HIVE_SERVER2_TRANSPORT_MODE("hive.server2.transport.mode", "binary", new StringSet("binary", "http"),
"Transport mode of HiveServer2."),
+ HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "",
+ "Bind host on which to run the HiveServer2 Thrift service."),
// http (over thrift) transport settings
HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001,
- "Port number when in HTTP mode."),
+ "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'http'."),
HIVE_SERVER2_THRIFT_HTTP_PATH("hive.server2.thrift.http.path", "cliservice",
"Path component of URL endpoint when in HTTP mode."),
HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS("hive.server2.thrift.http.min.worker.threads", 5,
@@ -1515,11 +1529,7 @@ public class HiveConf extends Configurat
// binary transport settings
HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000,
- "Port number of HiveServer2 Thrift interface.\n" +
- "Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT"),
- HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "",
- "Bind host on which to run the HiveServer2 Thrift interface.\n" +
- "Can be overridden by setting $HIVE_SERVER2_THRIFT_BIND_HOST"),
+ "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'binary'."),
// hadoop.rpc.protection being set to a higher level than HiveServer2
// does not make sense in most situations.
// HiveServer2 ignores hadoop.rpc.protection in favor of hive.server2.thrift.sasl.qop.
Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java Tue Sep 16 19:40:51 2014
@@ -210,7 +210,7 @@ public class TestBeeLineWithArgs {
}
scriptFile.delete();
}
-
+
/**
* Test that BeeLine will read comment lines that start with whitespace
* @throws Throwable
Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java Tue Sep 16 19:40:51 2014
@@ -262,10 +262,9 @@ public class TestJdbcDriver2 {
private void checkBadUrl(String url) throws SQLException {
try{
DriverManager.getConnection(url, "", "");
- fail("should have thrown IllegalArgumentException but did not ");
- } catch(SQLException i) {
- assertTrue(i.getMessage().contains("Bad URL format. Hostname not found "
- + " in authority part of the url"));
+ fail("Should have thrown JdbcUriParseException but did not ");
+ } catch(JdbcUriParseException e) {
+ assertTrue(e.getMessage().contains("Bad URL format"));
}
}
@@ -1618,6 +1617,10 @@ public class TestJdbcDriver2 {
// [url] [host] [port] [db]
private static final String[][] URL_PROPERTIES = new String[][] {
// binary mode
+ // For embedded mode, the JDBC uri is of the form:
+ // jdbc:hive2:///dbName;sess_var_list?hive_conf_list#hive_var_list
+ // and does not contain host:port string.
+ // As a result port is parsed to '-1' per the Java URI conventions
{"jdbc:hive2://", "", "", "default"},
{"jdbc:hive2://localhost:10001/default", "localhost", "10001", "default"},
{"jdbc:hive2://localhost/notdefault", "localhost", "10000", "notdefault"},
@@ -1654,7 +1657,8 @@ public class TestJdbcDriver2 {
};
@Test
- public void testParseUrlHttpMode() throws SQLException {
+ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException,
+ ZooKeeperHiveClientException {
new HiveDriver();
for (String[] testValues : HTTP_URL_PROPERTIES) {
JdbcConnectionParams params = Utils.parseURL(testValues[0]);
Modified: hive/trunk/jdbc/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/pom.xml?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/jdbc/pom.xml (original)
+++ hive/trunk/jdbc/pom.xml Tue Sep 16 19:40:51 2014
@@ -80,6 +80,17 @@
<artifactId>libthrift</artifactId>
<version>${libthrift.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<profiles>
Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java Tue Sep 16 19:40:51 2014
@@ -53,6 +53,7 @@ import javax.security.sasl.SaslException
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.KerberosSaslHelper;
import org.apache.hive.service.auth.PlainSaslHelper;
@@ -86,37 +87,20 @@ import org.apache.thrift.transport.TTran
*/
public class HiveConnection implements java.sql.Connection {
public static final Log LOG = LogFactory.getLog(HiveConnection.class.getName());
- private static final String HIVE_AUTH_TYPE= "auth";
- private static final String HIVE_AUTH_QOP = "sasl.qop";
- private static final String HIVE_AUTH_SIMPLE = "noSasl";
- private static final String HIVE_AUTH_TOKEN = "delegationToken";
- private static final String HIVE_AUTH_USER = "user";
- private static final String HIVE_AUTH_PRINCIPAL = "principal";
- private static final String HIVE_AUTH_PASSWD = "password";
- private static final String HIVE_AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
- private static final String HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
- private static final String HIVE_ANONYMOUS_USER = "anonymous";
- private static final String HIVE_ANONYMOUS_PASSWD = "anonymous";
- private static final String HIVE_USE_SSL = "ssl";
- private static final String HIVE_SSL_TRUST_STORE = "sslTrustStore";
- private static final String HIVE_SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
- private static final String HIVE_SERVER2_TRANSPORT_MODE = "hive.server2.transport.mode";
- private static final String HIVE_SERVER2_THRIFT_HTTP_PATH = "hive.server2.thrift.http.path";
private static final String HIVE_VAR_PREFIX = "hivevar:";
private static final String HIVE_CONF_PREFIX = "hiveconf:";
- // Currently supports JKS keystore format
- // See HIVE-6286 (Add support for PKCS12 keystore format)
- private static final String HIVE_SSL_TRUST_STORE_TYPE = "JKS";
-
- private final String jdbcURI;
- private final String host;
- private final int port;
+
+ private String jdbcUriString;
+ private String host;
+ private int port;
private final Map<String, String> sessConfMap;
private final Map<String, String> hiveConfMap;
private final Map<String, String> hiveVarMap;
+ private JdbcConnectionParams connParams;
private final boolean isEmbeddedMode;
private TTransport transport;
- private TCLIService.Iface client; // todo should be replaced by CliServiceClient
+ // TODO should be replaced by CliServiceClient
+ private TCLIService.Iface client;
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TSessionHandle sessHandle = null;
@@ -126,14 +110,12 @@ public class HiveConnection implements j
public HiveConnection(String uri, Properties info) throws SQLException {
setupLoginTimeout();
- jdbcURI = uri;
- // parse the connection uri
- Utils.JdbcConnectionParams connParams;
try {
connParams = Utils.parseURL(uri);
- } catch (IllegalArgumentException e) {
+ } catch (ZooKeeperHiveClientException e) {
throw new SQLException(e);
}
+ jdbcUriString = connParams.getJdbcUriString();
// extract parsed connection parameters:
// JDBC URL: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
// each list: <key1>=<val1>;<key2>=<val2> and so on
@@ -164,14 +146,14 @@ public class HiveConnection implements j
} else {
// extract user/password from JDBC connection properties if its not supplied in the
// connection URL
- if (info.containsKey(HIVE_AUTH_USER)) {
- sessConfMap.put(HIVE_AUTH_USER, info.getProperty(HIVE_AUTH_USER));
- if (info.containsKey(HIVE_AUTH_PASSWD)) {
- sessConfMap.put(HIVE_AUTH_PASSWD, info.getProperty(HIVE_AUTH_PASSWD));
+ if (info.containsKey(JdbcConnectionParams.AUTH_USER)) {
+ sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER));
+ if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) {
+ sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD));
}
}
- if (info.containsKey(HIVE_AUTH_TYPE)) {
- sessConfMap.put(HIVE_AUTH_TYPE, info.getProperty(HIVE_AUTH_TYPE));
+ if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) {
+ sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE));
}
// open the client transport
openTransport();
@@ -189,19 +171,44 @@ public class HiveConnection implements j
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
// open client session
- openSession(connParams);
+ openSession();
}
private void openTransport() throws SQLException {
- // TODO: Refactor transport creation to a factory, it's getting uber messy here
- transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
- try {
- if (!transport.isOpen()) {
- transport.open();
+ while (true) {
+ try {
+ transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
+ if (!transport.isOpen()) {
+ LOG.info("Will try to open client transport with JDBC Uri: " + jdbcUriString);
+ transport.open();
+ }
+ break;
+ } catch (TTransportException e) {
+ LOG.info("Could not open client transport with JDBC Uri: " + jdbcUriString);
+ // We'll retry till we exhaust all HiveServer2 uris from ZooKeeper
+ if ((sessConfMap.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE) != null)
+ && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(sessConfMap
+ .get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE)))) {
+ try {
+ // Update jdbcUriString, host & port variables in connParams
+ // Throw an exception if all HiveServer2 uris have been exhausted,
+ // or if we're unable to connect to ZooKeeper.
+ Utils.updateConnParamsFromZooKeeper(connParams);
+ } catch (ZooKeeperHiveClientException ze) {
+ throw new SQLException(
+ "Could not open client transport for any of the Server URI's in ZooKeeper: "
+ + ze.getMessage(), " 08S01", ze);
+ }
+ // Update with new values
+ jdbcUriString = connParams.getJdbcUriString();
+ host = connParams.getHost();
+ port = connParams.getPort();
+ LOG.info("Will retry opening client transport");
+ } else {
+ throw new SQLException("Could not open client transport with JDBC Uri: " + jdbcUriString
+ + ": " + e.getMessage(), " 08S01", e);
+ }
}
- } catch (TTransportException e) {
- throw new SQLException("Could not open connection to "
- + jdbcURI + ": " + e.getMessage(), " 08S01", e);
}
}
@@ -211,37 +218,36 @@ public class HiveConnection implements j
String schemeName = useSsl ? "https" : "http";
// http path should begin with "/"
String httpPath;
- httpPath = hiveConfMap.get(HIVE_SERVER2_THRIFT_HTTP_PATH);
- if(httpPath == null) {
+ httpPath = hiveConfMap.get(JdbcConnectionParams.HTTP_PATH);
+ if (httpPath == null) {
httpPath = "/";
- }
- else if(!httpPath.startsWith("/")) {
+ } else if (!httpPath.startsWith("/")) {
httpPath = "/" + httpPath;
}
- return schemeName + "://" + host + ":" + port + httpPath;
+ return schemeName + "://" + host + ":" + port + httpPath;
}
- private TTransport createHttpTransport() throws SQLException {
+ private TTransport createHttpTransport() throws SQLException, TTransportException {
DefaultHttpClient httpClient;
-
boolean useSsl = isSslConnection();
-
// Create an http client from the configs
- try {
- httpClient = getHttpClient(useSsl);
- } catch (Exception e) {
- String msg = "Could not create http connection to " +
- jdbcURI + ". " + e.getMessage();
- throw new SQLException(msg, " 08S01", e);
- }
-
+ httpClient = getHttpClient(useSsl);
try {
transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
+ // We'll call an open/close here to send a test HTTP message to the server. Any
+ // TTransportException caused by trying to connect to a non-available peer are thrown here.
+ // Bubbling them up the call hierarchy so that a retry can happen in openTransport,
+ // if dynamic service discovery is configured.
+ TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+ TOpenSessionResp openResp = client.OpenSession(new TOpenSessionReq());
+ if (openResp != null) {
+ client.CloseSession(new TCloseSessionReq(openResp.getSessionHandle()));
+ }
}
- catch (TTransportException e) {
+ catch (TException e) {
String msg = "Could not create http connection to " +
- jdbcURI + ". " + e.getMessage();
- throw new SQLException(msg, " 08S01", e);
+ jdbcUriString + ". " + e.getMessage();
+ throw new TTransportException(msg, e);
}
return transport;
}
@@ -263,7 +269,7 @@ public class HiveConnection implements j
* for sending to the server before every request.
*/
requestInterceptor = new HttpKerberosRequestInterceptor(
- sessConfMap.get(HIVE_AUTH_PRINCIPAL), host, getServerHttpUrl(false));
+ sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host, getServerHttpUrl(false));
}
else {
/**
@@ -273,11 +279,23 @@ public class HiveConnection implements j
requestInterceptor = new HttpBasicAuthInterceptor(getUserName(), getPassword());
// Configure httpClient for SSL
if (useSsl) {
- String sslTrustStorePath = sessConfMap.get(HIVE_SSL_TRUST_STORE);
+ String sslTrustStorePath = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE);
String sslTrustStorePassword = sessConfMap.get(
- HIVE_SSL_TRUST_STORE_PASSWORD);
+ JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
KeyStore sslTrustStore;
SSLSocketFactory socketFactory;
+ /**
+ * The code within the try block throws:
+ * 1. SSLInitializationException
+ * 2. KeyStoreException
+ * 3. IOException
+ * 4. NoSuchAlgorithmException
+ * 5. CertificateException
+ * 6. KeyManagementException
+ * 7. UnrecoverableKeyException
+ * We don't want the client to retry on any of these, hence we catch all
+ * and throw a SQLException.
+ */
try {
if (sslTrustStorePath == null || sslTrustStorePath.isEmpty()) {
// Create a default socket factory based on standard JSSE trust material
@@ -285,7 +303,7 @@ public class HiveConnection implements j
}
else {
// Pick trust store config from the given path
- sslTrustStore = KeyStore.getInstance(HIVE_SSL_TRUST_STORE_TYPE);
+ sslTrustStore = KeyStore.getInstance(JdbcConnectionParams.SSL_TRUST_STORE_TYPE);
sslTrustStore.load(new FileInputStream(sslTrustStorePath),
sslTrustStorePassword.toCharArray());
socketFactory = new SSLSocketFactory(sslTrustStore);
@@ -296,7 +314,7 @@ public class HiveConnection implements j
}
catch (Exception e) {
String msg = "Could not create an https connection to " +
- jdbcURI + ". " + e.getMessage();
+ jdbcUriString + ". " + e.getMessage();
throw new SQLException(msg, " 08S01", e);
}
}
@@ -316,29 +334,32 @@ public class HiveConnection implements j
* - Raw (non-SASL) socket
*
* Kerberos and Delegation token supports SASL QOP configurations
+ * @throws SQLException, TTransportException
*/
- private TTransport createBinaryTransport() throws SQLException {
+ private TTransport createBinaryTransport() throws SQLException, TTransportException {
try {
// handle secure connection if specified
- if (!HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE))) {
+ if (!JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))) {
// If Kerberos
Map<String, String> saslProps = new HashMap<String, String>();
SaslQOP saslQOP = SaslQOP.AUTH;
- if (sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL)) {
- if (sessConfMap.containsKey(HIVE_AUTH_QOP)) {
+ if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL)) {
+ if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_QOP)) {
try {
- saslQOP = SaslQOP.fromString(sessConfMap.get(HIVE_AUTH_QOP));
+ saslQOP = SaslQOP.fromString(sessConfMap.get(JdbcConnectionParams.AUTH_QOP));
} catch (IllegalArgumentException e) {
- throw new SQLException("Invalid " + HIVE_AUTH_QOP +
+ throw new SQLException("Invalid " + JdbcConnectionParams.AUTH_QOP +
" parameter. " + e.getMessage(), "42000", e);
}
}
saslProps.put(Sasl.QOP, saslQOP.toString());
saslProps.put(Sasl.SERVER_AUTH, "true");
- boolean assumeSubject = HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap.get(HIVE_AUTH_KERBEROS_AUTH_TYPE));
+ boolean assumeSubject = JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
+ .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
transport = KerberosSaslHelper.getKerberosTransport(
- sessConfMap.get(HIVE_AUTH_PRINCIPAL), host,
- HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps, assumeSubject);
+ sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host,
+ HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps,
+ assumeSubject);
} else {
// If there's a delegation token available then use token based connection
String tokenStr = getClientDelegationToken(sessConfMap);
@@ -349,10 +370,15 @@ public class HiveConnection implements j
// we are using PLAIN Sasl connection with user/password
String userName = getUserName();
String passwd = getPassword();
+ // Note: Thrift returns an SSL socket that is already bound to the specified host:port
+ // Therefore an open called on this would be a no-op later
+ // Hence, any TTransportException related to connecting with the peer are thrown here.
+ // Bubbling them up the call hierarchy so that a retry can happen in openTransport,
+ // if dynamic service discovery is configured.
if (isSslConnection()) {
// get SSL socket
- String sslTrustStore = sessConfMap.get(HIVE_SSL_TRUST_STORE);
- String sslTrustStorePassword = sessConfMap.get(HIVE_SSL_TRUST_STORE_PASSWORD);
+ String sslTrustStore = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE);
+ String sslTrustStorePassword = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
if (sslTrustStore == null || sslTrustStore.isEmpty()) {
transport = HiveAuthFactory.getSSLSocket(host, port, loginTimeout);
} else {
@@ -373,10 +399,7 @@ public class HiveConnection implements j
}
} catch (SaslException e) {
throw new SQLException("Could not create secure connection to "
- + jdbcURI + ": " + e.getMessage(), " 08S01", e);
- } catch (TTransportException e) {
- throw new SQLException("Could not create connection to "
- + jdbcURI + ": " + e.getMessage(), " 08S01", e);
+ + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
}
return transport;
}
@@ -385,7 +408,7 @@ public class HiveConnection implements j
private String getClientDelegationToken(Map<String, String> jdbcConnConf)
throws SQLException {
String tokenStr = null;
- if (HIVE_AUTH_TOKEN.equalsIgnoreCase(jdbcConnConf.get(HIVE_AUTH_TYPE))) {
+ if (JdbcConnectionParams.AUTH_TOKEN.equalsIgnoreCase(jdbcConnConf.get(JdbcConnectionParams.AUTH_TYPE))) {
// check delegation token in job conf if any
try {
tokenStr = ShimLoader.getHadoopShims().
@@ -397,7 +420,7 @@ public class HiveConnection implements j
return tokenStr;
}
- private void openSession(Utils.JdbcConnectionParams connParams) throws SQLException {
+ private void openSession() throws SQLException {
TOpenSessionReq openReq = new TOpenSessionReq();
Map<String, String> openConf = new HashMap<String, String>();
@@ -433,7 +456,7 @@ public class HiveConnection implements j
} catch (TException e) {
LOG.error("Error opening session", e);
throw new SQLException("Could not establish connection to "
- + jdbcURI + ": " + e.getMessage(), " 08S01", e);
+ + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
}
isClosed = false;
}
@@ -442,27 +465,27 @@ public class HiveConnection implements j
* @return username from sessConfMap
*/
private String getUserName() {
- return getSessionValue(HIVE_AUTH_USER, HIVE_ANONYMOUS_USER);
+ return getSessionValue(JdbcConnectionParams.AUTH_USER, JdbcConnectionParams.ANONYMOUS_USER);
}
/**
* @return password from sessConfMap
*/
private String getPassword() {
- return getSessionValue(HIVE_AUTH_PASSWD, HIVE_ANONYMOUS_PASSWD);
+ return getSessionValue(JdbcConnectionParams.AUTH_PASSWD, JdbcConnectionParams.ANONYMOUS_PASSWD);
}
private boolean isSslConnection() {
- return "true".equalsIgnoreCase(sessConfMap.get(HIVE_USE_SSL));
+ return "true".equalsIgnoreCase(sessConfMap.get(JdbcConnectionParams.USE_SSL));
}
private boolean isKerberosAuthMode() {
- return !HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE))
- && sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL);
+ return !JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))
+ && sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL);
}
private boolean isHttpTransportMode() {
- String transportMode = hiveConfMap.get(HIVE_SERVER2_TRANSPORT_MODE);
+ String transportMode = hiveConfMap.get(JdbcConnectionParams.TRANSPORT_MODE);
if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
return true;
}
Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java Tue Sep 16 19:40:51 2014
@@ -230,7 +230,12 @@ public class HiveDriver implements Drive
throw new SQLException("Invalid connection url: " + url);
}
- JdbcConnectionParams params = Utils.parseURL(url);
+ JdbcConnectionParams params = null;
+ try {
+ params = Utils.parseURL(url);
+ } catch (ZooKeeperHiveClientException e) {
+ throw new SQLException(e);
+ }
String host = params.getHost();
if (host == null){
host = "";
@@ -239,7 +244,7 @@ public class HiveDriver implements Drive
if(host.equals("")){
port = "";
}
- else if(port.equals("0")){
+ else if(port.equals("0") || port.equals("-1")){
port = Utils.DEFAULT_PORT;
}
String db = params.getDbName();
Added: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java?rev=1625363&view=auto
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java (added)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java Tue Sep 16 19:40:51 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.sql.SQLException;
+
+public class JdbcUriParseException extends SQLException {
+
+ private static final long serialVersionUID = 0;
+
+ /**
+ * @param cause (original exception)
+ */
+ public JdbcUriParseException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param msg (exception message)
+ */
+ public JdbcUriParseException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * @param msg (exception message)
+ * @param cause (original exception)
+ */
+ public JdbcUriParseException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+}
Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java Tue Sep 16 19:40:51 2014
@@ -19,17 +19,23 @@
package org.apache.hive.jdbc;
import java.net.URI;
+import java.net.URISyntaxException;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.thrift.TStatus;
import org.apache.hive.service.cli.thrift.TStatusCode;
public class Utils {
+ public static final Log LOG = LogFactory.getLog(Utils.class.getName());
/**
* The required prefix for the connection URL.
*/
@@ -47,14 +53,58 @@ public class Utils {
private static final String URI_JDBC_PREFIX = "jdbc:";
+ private static final String URI_HIVE_PREFIX = "hive2:";
+
public static class JdbcConnectionParams {
+ // Note on client side parameter naming convention:
+ // Prefer using a shorter camelCase param name instead of using the same name as the
+ // corresponding
+ // HiveServer2 config.
+ // For a jdbc url: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list,
+ // client side params are specified in sess_var_list
+
+ // Client param names:
+ static final String AUTH_TYPE = "auth";
+ static final String AUTH_QOP = "sasl.qop";
+ static final String AUTH_SIMPLE = "noSasl";
+ static final String AUTH_TOKEN = "delegationToken";
+ static final String AUTH_USER = "user";
+ static final String AUTH_PRINCIPAL = "principal";
+ static final String AUTH_PASSWD = "password";
+ static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
+ static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
+ static final String ANONYMOUS_USER = "anonymous";
+ static final String ANONYMOUS_PASSWD = "anonymous";
+ static final String USE_SSL = "ssl";
+ static final String SSL_TRUST_STORE = "sslTrustStore";
+ static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
+ static final String TRANSPORT_MODE = "hive.server2.transport.mode";
+ static final String HTTP_PATH = "hive.server2.thrift.http.path";
+ static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
+ // Don't use dynamic serice discovery
+ static final String SERVICE_DISCOVERY_MODE_NONE = "none";
+ // Use ZooKeeper for indirection while using dynamic service discovery
+ static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
+ static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
+
+ // Non-configurable params:
+ // ZOOKEEPER_SESSION_TIMEOUT is not exposed as client configurable
+ static final int ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000;
+ // Currently supports JKS keystore format
+ static final String SSL_TRUST_STORE_TYPE = "JKS";
+
private String host = null;
private int port;
+ private String jdbcUriString;
private String dbName = DEFAULT_DATABASE;
private Map<String,String> hiveConfs = new LinkedHashMap<String,String>();
private Map<String,String> hiveVars = new LinkedHashMap<String,String>();
private Map<String,String> sessionVars = new LinkedHashMap<String,String>();
private boolean isEmbeddedMode = false;
+ private String[] authorityList;
+ private String zooKeeperEnsemble = null;
+ private String currentHostZnodePath;
+ private List<String> rejectedHostZnodePaths = new ArrayList<String>();
public JdbcConnectionParams() {
}
@@ -62,46 +112,94 @@ public class Utils {
public String getHost() {
return host;
}
+
public int getPort() {
return port;
}
+
+ public String getJdbcUriString() {
+ return jdbcUriString;
+ }
+
public String getDbName() {
return dbName;
}
+
public Map<String, String> getHiveConfs() {
return hiveConfs;
}
- public Map<String,String> getHiveVars() {
+
+ public Map<String, String> getHiveVars() {
return hiveVars;
}
+
public boolean isEmbeddedMode() {
return isEmbeddedMode;
}
+
public Map<String, String> getSessionVars() {
return sessionVars;
}
+ public String[] getAuthorityList() {
+ return authorityList;
+ }
+
+ public String getZooKeeperEnsemble() {
+ return zooKeeperEnsemble;
+ }
+
+ public List<String> getRejectedHostZnodePaths() {
+ return rejectedHostZnodePaths;
+ }
+
+ public String getCurrentHostZnodePath() {
+ return currentHostZnodePath;
+ }
+
public void setHost(String host) {
this.host = host;
}
+
public void setPort(int port) {
this.port = port;
}
+
+ public void setJdbcUriString(String jdbcUriString) {
+ this.jdbcUriString = jdbcUriString;
+ }
+
public void setDbName(String dbName) {
this.dbName = dbName;
}
+
public void setHiveConfs(Map<String, String> hiveConfs) {
this.hiveConfs = hiveConfs;
}
- public void setHiveVars(Map<String,String> hiveVars) {
+
+ public void setHiveVars(Map<String, String> hiveVars) {
this.hiveVars = hiveVars;
}
+
public void setEmbeddedMode(boolean embeddedMode) {
this.isEmbeddedMode = embeddedMode;
}
+
public void setSessionVars(Map<String, String> sessionVars) {
this.sessionVars = sessionVars;
}
+
+ public void setSuppliedAuthorityList(String[] authorityList) {
+ this.authorityList = authorityList;
+ }
+
+ public void setZooKeeperEnsemble(String zooKeeperEnsemble) {
+ this.zooKeeperEnsemble = zooKeeperEnsemble;
+ }
+
+ public void setCurrentHostZnodePath(String currentHostZnodePath) {
+ this.currentHostZnodePath = currentHostZnodePath;
+ }
}
// Verify success or success_with_info status, else throw SQLException
@@ -124,27 +222,33 @@ public class Utils {
/**
* Parse JDBC connection URL
- * The new format of the URL is jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
- * where the optional sess, conf and var lists are semicolon separated <key>=<val> pairs. As before, if the
- * host/port is not specified, it the driver runs an embedded hive.
+ * The new format of the URL is:
+ * jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;sess_var_list?hive_conf_list#hive_var_list
+ * where the optional sess, conf and var lists are semicolon separated <key>=<val> pairs.
+ * For utilizing dynamic service discovery with HiveServer2 multiple comma separated host:port pairs can
+ * be specified as shown above.
+ * The JDBC driver resolves the list of uris and picks a specific server instance to connect to.
+ * Currently, dynamic service discovery using ZooKeeper is supported, in which case the host:port pairs represent a ZooKeeper ensemble.
+ *
+ * As before, if the host/port is not specified, it the driver runs an embedded hive.
* examples -
* jdbc:hive2://ubuntu:11000/db2?hive.cli.conf.printheader=true;hive.exec.mode.local.auto.inputbytes.max=9999#stab=salesTable;icol=customerID
* jdbc:hive2://?hive.cli.conf.printheader=true;hive.exec.mode.local.auto.inputbytes.max=9999#stab=salesTable;icol=customerID
* jdbc:hive2://ubuntu:11000/db2;user=foo;password=bar
*
* Connect to http://server:10001/hs2, with specified basicAuth credentials and initial database:
- * jdbc:hive2://server:10001/db;user=foo;password=bar?hive.server2.transport.mode=http;hive.server2.thrift.http.path=hs2
- *
- * Note that currently the session properties are not used.
+ * jdbc:hive2://server:10001/db;user=foo;password=bar?hive.server2.transport.mode=http;hive.server2.thrift.http.path=hs2
*
* @param uri
* @return
+ * @throws SQLException
*/
- public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentException {
+ public static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException,
+ SQLException, ZooKeeperHiveClientException {
JdbcConnectionParams connParams = new JdbcConnectionParams();
if (!uri.startsWith(URL_PREFIX)) {
- throw new IllegalArgumentException("Bad URL format: Missing prefix " + URL_PREFIX);
+ throw new JdbcUriParseException("Bad URL format: Missing prefix " + URL_PREFIX);
}
// For URLs with no other configuration
@@ -154,29 +258,28 @@ public class Utils {
return connParams;
}
- URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length()));
-
- // Check to prevent unintentional use of embedded mode. A missing "/"
- // to separate the 'path' portion of URI can result in this.
- // The missing "/" common typo while using secure mode, eg of such url -
- // jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM
- if((jdbcURI.getAuthority() != null) && (jdbcURI.getHost()==null)) {
- throw new IllegalArgumentException("Bad URL format. Hostname not found "
- + " in authority part of the url: " + jdbcURI.getAuthority()
- + ". Are you missing a '/' after the hostname ?");
- }
-
- connParams.setHost(jdbcURI.getHost());
- if (connParams.getHost() == null) {
+ // The JDBC URI now supports specifying multiple host:port if dynamic service discovery is
+ // configured on HiveServer2 (like: host1:port1,host2:port2,host3:port3)
+ // We'll extract the authorities (host:port combo) from the URI, extract session vars, hive
+ // confs & hive vars by parsing it as a Java URI.
+ // To parse the intermediate URI as a Java URI, we'll give a dummy authority(dummy:00000).
+ // Later, we'll substitute the dummy authority for a resolved authority.
+ String dummyAuthorityString = "dummyhost:00000";
+ String suppliedAuthorities = getAuthorities(uri, connParams);
+ if ((suppliedAuthorities == null) || (suppliedAuthorities.isEmpty())) {
+ // Given uri of the form:
+ // jdbc:hive2:///dbName;sess_var_list?hive_conf_list#hive_var_list
connParams.setEmbeddedMode(true);
} else {
- int port = jdbcURI.getPort();
- if (port == -1) {
- port = Integer.valueOf(DEFAULT_PORT);
- }
- connParams.setPort(port);
+ LOG.info("Supplied authorities: " + suppliedAuthorities);
+ String[] authorityList = suppliedAuthorities.split(",");
+ connParams.setSuppliedAuthorityList(authorityList);
+ uri = uri.replace(suppliedAuthorities, dummyAuthorityString);
}
+ // Now parse the connection uri with dummy authority
+ URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length()));
+
// key=value pattern
Pattern pattern = Pattern.compile("([^;]*)=([^;]*)[;]?");
@@ -192,12 +295,13 @@ public class Utils {
} else {
// we have dbname followed by session parameters
dbName = sessVars.substring(0, sessVars.indexOf(';'));
- sessVars = sessVars.substring(sessVars.indexOf(';')+1);
+ sessVars = sessVars.substring(sessVars.indexOf(';') + 1);
if (sessVars != null) {
Matcher sessMatcher = pattern.matcher(sessVars);
while (sessMatcher.find()) {
if (connParams.getSessionVars().put(sessMatcher.group(1), sessMatcher.group(2)) != null) {
- throw new IllegalArgumentException("Bad URL format: Multiple values for property " + sessMatcher.group(1));
+ throw new JdbcUriParseException("Bad URL format: Multiple values for property "
+ + sessMatcher.group(1));
}
}
}
@@ -225,10 +329,146 @@ public class Utils {
}
}
+ // Extract host, port
+ if (connParams.isEmbeddedMode()) {
+ // In case of embedded mode we were supplied with an empty authority.
+ // So we never substituted the authority with a dummy one.
+ connParams.setHost(jdbcURI.getHost());
+ connParams.setPort(jdbcURI.getPort());
+ } else {
+ // Else substitute the dummy authority with a resolved one.
+ // In case of dynamic service discovery using ZooKeeper, it picks a server uri from ZooKeeper
+ String resolvedAuthorityString = resolveAuthority(connParams);
+ uri = uri.replace(dummyAuthorityString, resolvedAuthorityString);
+ connParams.setJdbcUriString(uri);
+ // Create a Java URI from the resolved URI for extracting the host/port
+ URI resolvedAuthorityURI = null;
+ try {
+ resolvedAuthorityURI = new URI(null, resolvedAuthorityString, null, null, null);
+ } catch (URISyntaxException e) {
+ throw new JdbcUriParseException("Bad URL format: ", e);
+ }
+ connParams.setHost(resolvedAuthorityURI.getHost());
+ connParams.setPort(resolvedAuthorityURI.getPort());
+ }
+
return connParams;
}
/**
+ * Get the authority string from the supplied uri, which could potentially contain multiple
+ * host:port pairs.
+ *
+ * @param uri
+ * @param connParams
+ * @return
+ * @throws JdbcUriParseException
+ */
+ private static String getAuthorities(String uri, JdbcConnectionParams connParams)
+ throws JdbcUriParseException {
+ String authorities;
+ // For a jdbc uri like: jdbc:hive2://host1:port1,host2:port2,host3:port3/
+ // Extract the uri host:port list starting after "jdbc:hive2://", till the 1st "/" or EOL
+ int fromIndex = Utils.URL_PREFIX.length();
+ int toIndex = uri.indexOf("/", fromIndex);
+ if (toIndex < 0) {
+ authorities = uri.substring(fromIndex);
+ } else {
+ authorities = uri.substring(fromIndex, uri.indexOf("/", fromIndex));
+ }
+ return authorities;
+ }
+
+ /**
+ * Get a string representing a specific host:port
+ * @param connParams
+ * @return
+ * @throws JdbcUriParseException
+ * @throws ZooKeeperHiveClientException
+ */
+ private static String resolveAuthority(JdbcConnectionParams connParams)
+ throws JdbcUriParseException, ZooKeeperHiveClientException {
+ String serviceDiscoveryMode =
+ connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
+ if ((serviceDiscoveryMode != null)
+ && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER
+ .equalsIgnoreCase(serviceDiscoveryMode))) {
+ // Resolve using ZooKeeper
+ return resolveAuthorityUsingZooKeeper(connParams);
+ } else {
+ String authority = connParams.getAuthorityList()[0];
+ URI jdbcURI = URI.create(URI_HIVE_PREFIX + "//" + authority);
+ // Check to prevent unintentional use of embedded mode. A missing "/"
+ // to separate the 'path' portion of URI can result in this.
+ // The missing "/" common typo while using secure mode, eg of such url -
+ // jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM
+ if ((jdbcURI.getAuthority() != null) && (jdbcURI.getHost() == null)) {
+ throw new JdbcUriParseException("Bad URL format. Hostname not found "
+ + " in authority part of the url: " + jdbcURI.getAuthority()
+ + ". Are you missing a '/' after the hostname ?");
+ }
+ // Return the 1st element of the array
+ return jdbcURI.getAuthority();
+ }
+ }
+
+ /**
+ * Read a specific host:port from ZooKeeper
+ * @param connParams
+ * @return
+ * @throws ZooKeeperHiveClientException
+ */
+ private static String resolveAuthorityUsingZooKeeper(JdbcConnectionParams connParams)
+ throws ZooKeeperHiveClientException {
+ // Set ZooKeeper ensemble in connParams for later use
+ connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
+ return ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
+ }
+
+ /**
+ * Read the next server coordinates (host:port combo) from ZooKeeper. Ignore the znodes already
+ * explored. Also update the host, port, jdbcUriString fields of connParams.
+ *
+ * @param connParams
+ * @throws ZooKeeperHiveClientException
+ */
+ static void updateConnParamsFromZooKeeper(JdbcConnectionParams connParams)
+ throws ZooKeeperHiveClientException {
+ // Add current host to the rejected list
+ connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath());
+ // Get another HiveServer2 uri from ZooKeeper
+ String serverUriString = ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
+ // Parse serverUri to a java URI and extract host, port
+ URI serverUri = null;
+ try {
+ // Note URL_PREFIX is not a valid scheme format, therefore leaving it null in the constructor
+ // to construct a valid URI
+ serverUri = new URI(null, serverUriString, null, null, null);
+ } catch (URISyntaxException e) {
+ throw new ZooKeeperHiveClientException(e);
+ }
+ String oldServerHost = connParams.getHost();
+ int oldServerPort = connParams.getPort();
+ String newServerHost = serverUri.getHost();
+ int newServerPort = serverUri.getPort();
+ connParams.setHost(newServerHost);
+ connParams.setPort(newServerPort);
+ connParams.setJdbcUriString(connParams.getJdbcUriString().replace(
+ oldServerHost + ":" + oldServerPort, newServerHost + ":" + newServerPort));
+ }
+
+ private static String joinStringArray(String[] stringArray, String seperator) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int cur = 0, end = stringArray.length; cur < end; cur++) {
+ if (cur > 0) {
+ stringBuilder.append(seperator);
+ }
+ stringBuilder.append(stringArray[cur]);
+ }
+ return stringBuilder.toString();
+ }
+
+ /**
* Takes a version string delimited by '.' and '-' characters
* and returns a partial version.
*
Added: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java?rev=1625363&view=auto
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java (added)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java Tue Sep 16 19:40:51 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.jdbc;
+
+public class ZooKeeperHiveClientException extends Exception {
+
+ private static final long serialVersionUID = 0;
+
+ /**
+ * @param cause (original exception)
+ */
+ public ZooKeeperHiveClientException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param msg (exception message)
+ */
+ public ZooKeeperHiveClientException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * @param msg (exception message)
+ * @param cause (original exception)
+ */
+ public ZooKeeperHiveClientException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+}
Added: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java?rev=1625363&view=auto
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java (added)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java Tue Sep 16 19:40:51 2014
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class ZooKeeperHiveClientHelper {
+ public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
+
+ /**
+ * A no-op watcher class
+ */
+ public static class DummyWatcher implements Watcher {
+ public void process(org.apache.zookeeper.WatchedEvent event) {
+ }
+ }
+
+ /**
+ * Resolve to a host:port by connecting to ZooKeeper and picking a host randomly.
+ *
+ * @param uri
+ * @param connParams
+ * @return
+ * @throws SQLException
+ */
+ static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams)
+ throws ZooKeeperHiveClientException {
+ String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
+ String zooKeeperNamespace =
+ connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE);
+ List<String> serverHosts;
+ Random randomizer = new Random();
+ String serverNode;
+ // Pick a random HiveServer2 host from the ZooKeeper namspace
+ try {
+ ZooKeeper zooKeeperClient =
+ new ZooKeeper(zooKeeperEnsemble, JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT,
+ new ZooKeeperHiveClientHelper.DummyWatcher());
+ // All the HiveServer2 host nodes that are in ZooKeeper currently
+ serverHosts = zooKeeperClient.getChildren("/" + zooKeeperNamespace, false);
+ // Remove the znodes we've already tried from this list
+ serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
+ if (serverHosts.isEmpty()) {
+ throw new ZooKeeperHiveClientException(
+ "Tried all existing HiveServer2 uris from ZooKeeper.");
+ }
+ // Now pick a host randomly
+ serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
+ connParams.setCurrentHostZnodePath(serverNode);
+ // Read the value from the node (UTF-8 enoded byte array) and convert it to a String
+ String serverUri =
+ new String(zooKeeperClient.getData("/" + zooKeeperNamespace + "/" + serverNode, false,
+ null), Charset.forName("UTF-8"));
+ LOG.info("Selected HiveServer2 instance with uri: " + serverUri);
+ return serverUri;
+ } catch (Exception e) {
+ throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e);
+ }
+ }
+
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Tue Sep 16 19:40:51 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.lockmgr
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
@@ -73,31 +74,6 @@ public class ZooKeeperHiveLockManager im
}
/**
- * @param conf The hive configuration
- * Get the quorum server address from the configuration. The format is:
- * host1:port, host2:port..
- **/
- @VisibleForTesting
- static String getQuorumServers(HiveConf conf) {
- String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(",");
- String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT);
- StringBuilder quorum = new StringBuilder();
- for(int i=0; i<hosts.length; i++) {
- quorum.append(hosts[i].trim());
- if (!hosts[i].contains(":")) {
- // if the hostname doesn't contain a port, add the configured port to hostname
- quorum.append(":");
- quorum.append(port);
- }
-
- if (i != hosts.length-1)
- quorum.append(",");
- }
-
- return quorum.toString();
- }
-
- /**
* @param ctx The lock manager context (containing the Hive configuration file)
* Start the ZooKeeper client based on the zookeeper cluster specified in the conf.
**/
@@ -105,7 +81,7 @@ public class ZooKeeperHiveLockManager im
this.ctx = ctx;
HiveConf conf = ctx.getConf();
sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
- quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf);
+ quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
sleepTime = conf.getTimeVar(
HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
@@ -146,7 +122,7 @@ public class ZooKeeperHiveLockManager im
return;
}
- zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher());
+ zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new ZooKeeperHiveHelper.DummyWatcher());
}
/**
@@ -517,8 +493,8 @@ public class ZooKeeperHiveLockManager im
ZooKeeper zkpClient = null;
try {
int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
- String quorumServers = getQuorumServers(conf);
- Watcher dummyWatcher = new DummyWatcher();
+ String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
+ Watcher dummyWatcher = new ZooKeeperHiveHelper.DummyWatcher();
zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher);
String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
List<HiveLock> locks = getLocks(conf, zkpClient, null, parent, false, false);
@@ -629,7 +605,8 @@ public class ZooKeeperHiveLockManager im
if (fetchData) {
try {
- data = new HiveLockObjectData(new String(zkpClient.getData(curChild, new DummyWatcher(), null)));
+ data = new HiveLockObjectData(new String(zkpClient.getData(curChild,
+ new ZooKeeperHiveHelper.DummyWatcher(), null)));
data.setClientIp(clientIp);
} catch (Exception e) {
LOG.error("Error in getting data for " + curChild, e);
@@ -789,11 +766,6 @@ public class ZooKeeperHiveLockManager im
return null;
}
- public static class DummyWatcher implements Watcher {
- public void process(org.apache.zookeeper.WatchedEvent event) {
- }
- }
-
@Override
public void prepareRetry() throws LockException {
try {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java?rev=1625363&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java Tue Sep 16 19:40:51 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+public class ZooKeeperHiveHelper {
+ public static final Log LOG = LogFactory.getLog(ZooKeeperHiveHelper.class.getName());
+ public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
+ /**
+ * Get the ensemble server addresses from the configuration. The format is: host1:port,
+ * host2:port..
+ *
+ * @param conf
+ **/
+ public static String getQuorumServers(HiveConf conf) {
+ String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(",");
+ String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT);
+ StringBuilder quorum = new StringBuilder();
+ for (int i = 0; i < hosts.length; i++) {
+ quorum.append(hosts[i].trim());
+ if (!hosts[i].contains(":")) {
+ // if the hostname doesn't contain a port, add the configured port to hostname
+ quorum.append(":");
+ quorum.append(port);
+ }
+
+ if (i != hosts.length - 1)
+ quorum.append(",");
+ }
+
+ return quorum.toString();
+ }
+
+
+ /**
+ * Create a path on ZooKeeper, if it does not already exist ("mkdir -p")
+ *
+ * @param zooKeeperClient ZooKeeper session
+ * @param path string with ZOOKEEPER_PATH_SEPARATOR as the separator
+ * @param acl list of ACL entries
+ * @param createMode for create mode of each node in the patch
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public static String createPathRecursively(ZooKeeper zooKeeperClient, String path, List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ String[] pathComponents = StringUtils.splitByWholeSeparator(path, ZOOKEEPER_PATH_SEPARATOR);
+ String currentPath = "";
+ for (String pathComponent : pathComponents) {
+ currentPath += ZOOKEEPER_PATH_SEPARATOR + pathComponent;
+ try {
+ String node = zooKeeperClient.create(currentPath, new byte[0], acl, createMode);
+ LOG.info("Created path: " + node);
+ } catch (KeeperException.NodeExistsException e) {
+ // Do nothing here
+ }
+ }
+ return currentPath;
+ }
+
+ /**
+ * A no-op watcher class
+ */
+ public static class DummyWatcher implements Watcher {
+ public void process(org.apache.zookeeper.WatchedEvent event) {
+ }
+ }
+
+}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java Tue Sep 16 19:40:51 2014
@@ -25,6 +25,7 @@ import java.util.Collections;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;
@@ -87,14 +88,14 @@ public class TestZookeeperLockManager {
public void testGetQuorumServers() {
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1");
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
- Assert.assertEquals("node1:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+ Assert.assertEquals("node1:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1,node2,node3");
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
- Assert.assertEquals("node1:9999,node2:9999,node3:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+ Assert.assertEquals("node1:9999,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1:5666,node2,node3");
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
- Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+ Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Tue Sep 16 19:40:51 2014
@@ -66,7 +66,7 @@ public class CLIService extends Composit
private UserGroupInformation httpUGI;
public CLIService() {
- super("CLIService");
+ super(CLIService.class.getSimpleName());
}
@Override
@@ -201,8 +201,7 @@ public class CLIService extends Composit
* @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle)
*/
@Override
- public void closeSession(SessionHandle sessionHandle)
- throws HiveSQLException {
+ public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
sessionManager.closeSession(sessionHandle);
LOG.debug(sessionHandle + ": closeSession()");
}
@@ -470,4 +469,8 @@ public class CLIService extends Composit
sessionManager.getSession(sessionHandle).renewDelegationToken(authFactory, tokenStr);
LOG.info(sessionHandle + ": renewDelegationToken()");
}
+
+ public SessionManager getSessionManager() {
+ return sessionManager;
+ }
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Sep 16 19:40:51 2014
@@ -47,7 +47,7 @@ public class OperationManager extends Ab
new HashMap<OperationHandle, Operation>();
public OperationManager() {
- super("OperationManager");
+ super(OperationManager.class.getSimpleName());
}
@Override
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Sep 16 19:40:51 2014
@@ -166,8 +166,8 @@ public class HiveSessionImpl implements
IHiveFileProcessor processor = new GlobalHivercFileProcessor();
try {
- if (hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION) != null) {
- String hiverc = hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION)
+ if (hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION) != null) {
+ String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION)
+ File.separator + SessionManager.HIVERCFILE;
if (new File(hiverc).exists()) {
LOG.info("Running global init file: " + hiverc);
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Sep 16 19:40:51 2014
@@ -67,7 +67,7 @@ public class SessionManager extends Comp
private volatile boolean shutdown;
public SessionManager() {
- super("SessionManager");
+ super(SessionManager.class.getSimpleName());
}
@Override
@@ -356,5 +356,9 @@ public class SessionManager extends Comp
return backgroundOperationPool.submit(r);
}
+ public int getOpenSessionCount() {
+ return handleToSession.size();
+ }
+
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Tue Sep 16 19:40:51 2014
@@ -18,7 +18,6 @@
package org.apache.hive.service.cli.thrift;
-import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -40,72 +39,54 @@ import org.apache.thrift.transport.TTran
public class ThriftBinaryCLIService extends ThriftCLIService {
public ThriftBinaryCLIService(CLIService cliService) {
- super(cliService, "ThriftBinaryCLIService");
+ super(cliService, ThriftBinaryCLIService.class.getSimpleName());
}
@Override
public void run() {
try {
- hiveAuthFactory = new HiveAuthFactory(hiveConf);
- TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
- TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
-
- String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
- if (portString != null) {
- portNum = Integer.valueOf(portString);
- } else {
- portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
- }
-
- String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
- if (hiveHost == null) {
- hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
- }
-
- if (hiveHost != null && !hiveHost.isEmpty()) {
- serverAddress = new InetSocketAddress(hiveHost, portNum);
- } else {
- serverAddress = new InetSocketAddress(portNum);
- }
-
- minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
- maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
- workerKeepAliveTime = hiveConf.getTimeVar(
- ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ // Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
+ // Thrift configs
+ hiveAuthFactory = new HiveAuthFactory(hiveConf);
+ TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
+ TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
TServerSocket serverSocket = null;
if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
} else {
String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
if (keyStorePath.isEmpty()) {
- throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname +
- " Not configured for SSL connection");
+ throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+ + " Not configured for SSL connection");
}
String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
- serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum,
- keyStorePath, keyStorePassword);
+ serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
+ keyStorePassword);
}
+
+ // Server args
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
- .processorFactory(processorFactory)
- .transportFactory(transportFactory)
- .protocolFactory(new TBinaryProtocol.Factory())
- .executorService(executorService);
+ .processorFactory(processorFactory).transportFactory(transportFactory)
+ .protocolFactory(new TBinaryProtocol.Factory()).executorService(executorService);
+ // TCP Server
server = new TThreadPoolServer(sargs);
-
- LOG.info("ThriftBinaryCLIService listening on " + serverAddress);
-
server.serve();
-
+ String msg = "Started " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
+ LOG.info(msg);
} catch (Throwable t) {
- LOG.error("Error: ", t);
+ LOG.fatal(
+ "Error starting HiveServer2: could not start "
+ + ThriftBinaryCLIService.class.getSimpleName(), t);
+ System.exit(-1);
}
-
}
+
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Sep 16 19:40:51 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
@@ -34,6 +35,7 @@ import org.apache.hive.service.auth.Hive
import org.apache.hive.service.auth.TSetIpAddressProcessor;
import org.apache.hive.service.cli.*;
import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.server.HiveServer2;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
@@ -48,9 +50,11 @@ public abstract class ThriftCLIService e
protected CLIService cliService;
private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
private static final TStatus ERROR_STATUS = new TStatus(TStatusCode.ERROR_STATUS);
+ protected static HiveAuthFactory hiveAuthFactory;
protected int portNum;
protected InetSocketAddress serverAddress;
+ protected String hiveHost;
protected TServer server;
protected org.eclipse.jetty.server.Server httpServer;
@@ -62,8 +66,7 @@ public abstract class ThriftCLIService e
protected int minWorkerThreads;
protected int maxWorkerThreads;
protected long workerKeepAliveTime;
-
- protected static HiveAuthFactory hiveAuthFactory;
+ private HiveServer2 hiveServer2;
public ThriftCLIService(CLIService cliService, String serviceName) {
super(serviceName);
@@ -73,6 +76,43 @@ public abstract class ThriftCLIService e
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
+
+ // Initialize common server configs needed in both binary & http modes
+ String portString;
+ hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+ if (hiveHost == null) {
+ hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+ }
+ // HTTP mode
+ if (HiveServer2.isHTTPTransportMode(hiveConf)) {
+ workerKeepAliveTime =
+ hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
+ TimeUnit.SECONDS);
+ portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
+ }
+ }
+ // Binary mode
+ else {
+ workerKeepAliveTime =
+ hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ }
+ }
+ if (hiveHost != null && !hiveHost.isEmpty()) {
+ serverAddress = new InetSocketAddress(hiveHost, portNum);
+ } else {
+ serverAddress = new InetSocketAddress(portNum);
+ }
+ minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+ maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
super.init(hiveConf);
}
@@ -105,6 +145,14 @@ public abstract class ThriftCLIService e
super.stop();
}
+ public int getPortNumber() {
+ return portNum;
+ }
+
+ public InetSocketAddress getServerAddress() {
+ return serverAddress;
+ }
+
@Override
public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
throws TException {
@@ -308,6 +356,24 @@ public abstract class ThriftCLIService e
} catch (Exception e) {
LOG.warn("Error closing session: ", e);
resp.setStatus(HiveSQLException.toTStatus(e));
+ } finally {
+ if (!(isEmbedded) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY))
+ && (!hiveServer2.isRegisteredWithZooKeeper())) {
+ // Asynchronously shutdown this instance of HiveServer2,
+ // if there are no active client sessions
+ if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+ LOG.info("This instance of HiveServer2 has been removed from the list of server "
+ + "instances available for dynamic service discovery. "
+ + "The last client session has ended - will shutdown now.");
+ Thread shutdownThread = new Thread() {
+ @Override
+ public void run() {
+ hiveServer2.stop();
+ }
+ };
+ shutdownThread.start();
+ }
+ }
}
return resp;
}
@@ -591,5 +657,9 @@ public abstract class ThriftCLIService e
.equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
}
+ public void setHiveServer2(HiveServer2 hiveServer2) {
+ this.hiveServer2 = hiveServer2;
+ }
+
}