You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2014/09/16 21:40:52 UTC

svn commit: r1625363 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ itests/hive-unit/src/test/java/org/apache/hive/beeline/ itests/hive-unit/src/test/java/org/apache/hive/jdbc/ jdbc/ jdbc/src/java/org/apache/hive/jdbc/ ql/src/java...

Author: vgumashta
Date: Tue Sep 16 19:40:51 2014
New Revision: 1625363

URL: http://svn.apache.org/r1625363
Log:
HIVE-7935: Support dynamic service discovery for HiveServer2 (Vaibhav Gumashta reviewed by Thejas Nair, Alan Gates)

Added:
    hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java
    hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java
    hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
    hive/trunk/jdbc/pom.xml
    hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
    hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
    hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
    hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java
    hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Sep 16 19:40:51 2014
@@ -205,7 +205,7 @@ public class HiveConf extends Configurat
     PLAN_SERIALIZATION("hive.plan.serialization.format", "kryo",
         "Query plan format serialization between client and task nodes. \n" +
         "Two supported values are : kryo and javaXML. Kryo is default."),
-    SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive", 
+    SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive",
         "HDFS root scratch dir for Hive jobs which gets created with 777 permission. " +
         "For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, " +
         "with ${hive.scratch.dir.permission}."),
@@ -215,7 +215,7 @@ public class HiveConf extends Configurat
     DOWNLOADED_RESOURCES_DIR("hive.downloaded.resources.dir",
         "${system:java.io.tmpdir}" + File.separator + "${hive.session.id}_resources",
         "Temporary local directory for added resources in the remote file system."),
-    SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700", 
+    SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700",
         "The permission for the user specific scratch directories that get created."),
     SUBMITVIACHILD("hive.exec.submitviachild", false, ""),
     SUBMITLOCALTASKVIACHILD("hive.exec.submit.local.task.via.child", true,
@@ -1243,10 +1243,16 @@ public class HiveConf extends Configurat
         "This param is to control whether or not only do lock on queries\n" +
         "that need to execute at least one mapred job."),
 
+     // Zookeeper related configs
     HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "",
-        "The list of ZooKeeper servers to talk to. This is only needed for read/write locks."),
+        "List of ZooKeeper servers to talk to. This is needed for: " +
+        "1. Read/write locks - when hive.lock.manager is set to " +
+        "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, " +
+        "2. When HiveServer2 supports service discovery via Zookeeper."),
     HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181",
-        "The port of ZooKeeper servers to talk to. This is only needed for read/write locks."),
+        "The port of ZooKeeper servers to talk to. " +
+        "If the list of Zookeeper servers specified in hive.zookeeper.quorum," +
+        "does not contain port numbers, this value is used."),
     HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000,
         "ZooKeeper client's session timeout. The client is disconnected, and as a result, all locks released, \n" +
         "if a heartbeat is not sent in the timeout."),
@@ -1446,11 +1452,6 @@ public class HiveConf extends Configurat
         "If the property is set, the value must be a valid URI (java.net.URI, e.g. \"file:///tmp/my-logging.properties\"), \n" +
         "which you can then extract a URL from and pass to PropertyConfigurator.configure(URL)."),
 
-    // Hive global init file location
-    HIVE_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
-        "The location of HS2 global init file (.hiverc).\n" +
-        "If the property is reset, the value must be a valid path where the init file is located."),
-
     // prefix used to auto generated column aliases (this should be started with '_')
     HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL("hive.autogen.columnalias.prefix.label", "_c",
         "String used as a prefix when auto generating column alias.\n" +
@@ -1489,16 +1490,29 @@ public class HiveConf extends Configurat
         "table. From 0.12 onwards, they are displayed separately. This flag will let you\n" +
         "get old behavior, if desired. See, test-case in patch for HIVE-6689."),
 
+     // HiveServer2 specific configs
     HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, null),
-        "This number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds between retries. \n" +
-        "The default of 30 will keep trying for 30 minutes."),
-
+        "Number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds " +
+        "between retries. \n The default of 30 will keep trying for 30 minutes."),
+    HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY("hive.server2.support.dynamic.service.discovery", false,
+        "Whether HiveServer2 supports dynamic service discovery for its clients. " +
+        "To support this, each instance of HiveServer2 currently uses ZooKeeper to register itself, " +
+        "when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: " +
+        "hive.zookeeper.quorum in their connection string."),
+    HIVE_SERVER2_ZOOKEEPER_NAMESPACE("hive.server2.zookeeper.namespace", "hiveserver2",
+        "The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."),
+    // HiveServer2 global init file location
+    HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
+        "The location of HS2 global init file (.hiverc).\n" +
+        "If the property is reset, the value must be a valid path where the init file is located."),
     HIVE_SERVER2_TRANSPORT_MODE("hive.server2.transport.mode", "binary", new StringSet("binary", "http"),
         "Transport mode of HiveServer2."),
+    HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "",
+        "Bind host on which to run the HiveServer2 Thrift service."),
 
     // http (over thrift) transport settings
     HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001,
-        "Port number when in HTTP mode."),
+        "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'http'."),
     HIVE_SERVER2_THRIFT_HTTP_PATH("hive.server2.thrift.http.path", "cliservice",
         "Path component of URL endpoint when in HTTP mode."),
     HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS("hive.server2.thrift.http.min.worker.threads", 5,
@@ -1515,11 +1529,7 @@ public class HiveConf extends Configurat
 
     // binary transport settings
     HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000,
-        "Port number of HiveServer2 Thrift interface.\n" +
-        "Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT"),
-    HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "",
-        "Bind host on which to run the HiveServer2 Thrift interface.\n" +
-        "Can be overridden by setting $HIVE_SERVER2_THRIFT_BIND_HOST"),
+        "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'binary'."),
     // hadoop.rpc.protection being set to a higher level than HiveServer2
     // does not make sense in most situations.
     // HiveServer2 ignores hadoop.rpc.protection in favor of hive.server2.thrift.sasl.qop.

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java Tue Sep 16 19:40:51 2014
@@ -210,7 +210,7 @@ public class TestBeeLineWithArgs {
     }
     scriptFile.delete();
   }
-  
+
   /**
    * Test that BeeLine will read comment lines that start with whitespace
    * @throws Throwable

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java Tue Sep 16 19:40:51 2014
@@ -262,10 +262,9 @@ public class TestJdbcDriver2 {
   private void checkBadUrl(String url) throws SQLException {
     try{
       DriverManager.getConnection(url, "", "");
-      fail("should have thrown IllegalArgumentException but did not ");
-    } catch(SQLException i) {
-      assertTrue(i.getMessage().contains("Bad URL format. Hostname not found "
-          + " in authority part of the url"));
+      fail("Should have thrown JdbcUriParseException but did not ");
+    } catch(JdbcUriParseException e) {
+      assertTrue(e.getMessage().contains("Bad URL format"));
     }
   }
 
@@ -1618,6 +1617,10 @@ public class TestJdbcDriver2 {
   // [url] [host] [port] [db]
   private static final String[][] URL_PROPERTIES = new String[][] {
     // binary mode
+    // For embedded mode, the JDBC uri is of the form:
+    // jdbc:hive2:///dbName;sess_var_list?hive_conf_list#hive_var_list
+    // and does not contain host:port string.
+    // As a result port is parsed to '-1' per the Java URI conventions
     {"jdbc:hive2://", "", "", "default"},
     {"jdbc:hive2://localhost:10001/default", "localhost", "10001", "default"},
     {"jdbc:hive2://localhost/notdefault", "localhost", "10000", "notdefault"},
@@ -1654,7 +1657,8 @@ public class TestJdbcDriver2 {
   };
 
   @Test
-  public void testParseUrlHttpMode() throws SQLException {
+  public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException,
+      ZooKeeperHiveClientException {
     new HiveDriver();
     for (String[] testValues : HTTP_URL_PROPERTIES) {
       JdbcConnectionParams params = Utils.parseURL(testValues[0]);

Modified: hive/trunk/jdbc/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/pom.xml?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/jdbc/pom.xml (original)
+++ hive/trunk/jdbc/pom.xml Tue Sep 16 19:40:51 2014
@@ -80,6 +80,17 @@
       <artifactId>libthrift</artifactId>
       <version>${libthrift.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+            <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <profiles>

Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java Tue Sep 16 19:40:51 2014
@@ -53,6 +53,7 @@ import javax.security.sasl.SaslException
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.KerberosSaslHelper;
 import org.apache.hive.service.auth.PlainSaslHelper;
@@ -86,37 +87,20 @@ import org.apache.thrift.transport.TTran
  */
 public class HiveConnection implements java.sql.Connection {
   public static final Log LOG = LogFactory.getLog(HiveConnection.class.getName());
-  private static final String HIVE_AUTH_TYPE= "auth";
-  private static final String HIVE_AUTH_QOP = "sasl.qop";
-  private static final String HIVE_AUTH_SIMPLE = "noSasl";
-  private static final String HIVE_AUTH_TOKEN = "delegationToken";
-  private static final String HIVE_AUTH_USER = "user";
-  private static final String HIVE_AUTH_PRINCIPAL = "principal";
-  private static final String HIVE_AUTH_PASSWD = "password";
-  private static final String HIVE_AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
-  private static final String HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
-  private static final String HIVE_ANONYMOUS_USER = "anonymous";
-  private static final String HIVE_ANONYMOUS_PASSWD = "anonymous";
-  private static final String HIVE_USE_SSL = "ssl";
-  private static final String HIVE_SSL_TRUST_STORE = "sslTrustStore";
-  private static final String HIVE_SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
-  private static final String HIVE_SERVER2_TRANSPORT_MODE = "hive.server2.transport.mode";
-  private static final String HIVE_SERVER2_THRIFT_HTTP_PATH = "hive.server2.thrift.http.path";
   private static final String HIVE_VAR_PREFIX = "hivevar:";
   private static final String HIVE_CONF_PREFIX = "hiveconf:";
-  // Currently supports JKS keystore format
-  // See HIVE-6286 (Add support for PKCS12 keystore format)
-  private static final String HIVE_SSL_TRUST_STORE_TYPE = "JKS";
-
-  private final String jdbcURI;
-  private final String host;
-  private final int port;
+
+  private String jdbcUriString;
+  private String host;
+  private int port;
   private final Map<String, String> sessConfMap;
   private final Map<String, String> hiveConfMap;
   private final Map<String, String> hiveVarMap;
+  private JdbcConnectionParams connParams;
   private final boolean isEmbeddedMode;
   private TTransport transport;
-  private TCLIService.Iface client;   // todo should be replaced by CliServiceClient
+  // TODO should be replaced by CliServiceClient
+  private TCLIService.Iface client;
   private boolean isClosed = true;
   private SQLWarning warningChain = null;
   private TSessionHandle sessHandle = null;
@@ -126,14 +110,12 @@ public class HiveConnection implements j
 
   public HiveConnection(String uri, Properties info) throws SQLException {
     setupLoginTimeout();
-    jdbcURI = uri;
-    // parse the connection uri
-    Utils.JdbcConnectionParams connParams;
     try {
       connParams = Utils.parseURL(uri);
-    } catch (IllegalArgumentException e) {
+    } catch (ZooKeeperHiveClientException e) {
       throw new SQLException(e);
     }
+    jdbcUriString = connParams.getJdbcUriString();
     // extract parsed connection parameters:
     // JDBC URL: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
     // each list: <key1>=<val1>;<key2>=<val2> and so on
@@ -164,14 +146,14 @@ public class HiveConnection implements j
     } else {
       // extract user/password from JDBC connection properties if its not supplied in the
       // connection URL
-      if (info.containsKey(HIVE_AUTH_USER)) {
-        sessConfMap.put(HIVE_AUTH_USER, info.getProperty(HIVE_AUTH_USER));
-        if (info.containsKey(HIVE_AUTH_PASSWD)) {
-          sessConfMap.put(HIVE_AUTH_PASSWD, info.getProperty(HIVE_AUTH_PASSWD));
+      if (info.containsKey(JdbcConnectionParams.AUTH_USER)) {
+        sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER));
+        if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) {
+          sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD));
         }
       }
-      if (info.containsKey(HIVE_AUTH_TYPE)) {
-        sessConfMap.put(HIVE_AUTH_TYPE, info.getProperty(HIVE_AUTH_TYPE));
+      if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) {
+        sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE));
       }
       // open the client transport
       openTransport();
@@ -189,19 +171,44 @@ public class HiveConnection implements j
     supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
 
     // open client session
-    openSession(connParams);
+    openSession();
   }
 
   private void openTransport() throws SQLException {
-    // TODO: Refactor transport creation to a factory, it's getting uber messy here
-    transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
-    try {
-      if (!transport.isOpen()) {
-        transport.open();
+    while (true) {
+      try {
+        transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
+        if (!transport.isOpen()) {
+          LOG.info("Will try to open client transport with JDBC Uri: " + jdbcUriString);
+          transport.open();
+        }
+        break;
+      } catch (TTransportException e) {
+        LOG.info("Could not open client transport with JDBC Uri: " + jdbcUriString);
+        // We'll retry till we exhaust all HiveServer2 uris from ZooKeeper
+        if ((sessConfMap.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE) != null)
+            && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(sessConfMap
+                .get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE)))) {
+          try {
+            // Update jdbcUriString, host & port variables in connParams
+            // Throw an exception if all HiveServer2 uris have been exhausted,
+            // or if we're unable to connect to ZooKeeper.
+            Utils.updateConnParamsFromZooKeeper(connParams);
+          } catch (ZooKeeperHiveClientException ze) {
+            throw new SQLException(
+                "Could not open client transport for any of the Server URI's in ZooKeeper: "
+                    + ze.getMessage(), " 08S01", ze);
+          }
+          // Update with new values
+          jdbcUriString = connParams.getJdbcUriString();
+          host = connParams.getHost();
+          port = connParams.getPort();
+          LOG.info("Will retry opening client transport");
+        } else {
+          throw new SQLException("Could not open client transport with JDBC Uri: " + jdbcUriString
+              + ": " + e.getMessage(), " 08S01", e);
+        }
       }
-    } catch (TTransportException e) {
-      throw new SQLException("Could not open connection to "
-          + jdbcURI + ": " + e.getMessage(), " 08S01", e);
     }
   }
 
@@ -211,37 +218,36 @@ public class HiveConnection implements j
     String schemeName = useSsl ? "https" : "http";
     // http path should begin with "/"
     String httpPath;
-    httpPath = hiveConfMap.get(HIVE_SERVER2_THRIFT_HTTP_PATH);
-    if(httpPath == null) {
+    httpPath = hiveConfMap.get(JdbcConnectionParams.HTTP_PATH);
+    if (httpPath == null) {
       httpPath = "/";
-    }
-    else if(!httpPath.startsWith("/")) {
+    } else if (!httpPath.startsWith("/")) {
       httpPath = "/" + httpPath;
     }
-    return schemeName +  "://" + host + ":" + port + httpPath;
+    return schemeName + "://" + host + ":" + port + httpPath;
   }
 
-  private TTransport createHttpTransport() throws SQLException {
+  private TTransport createHttpTransport() throws SQLException, TTransportException {
     DefaultHttpClient httpClient;
-
     boolean useSsl = isSslConnection();
-
     // Create an http client from the configs
-    try {
-      httpClient = getHttpClient(useSsl);
-    } catch (Exception e) {
-      String msg =  "Could not create http connection to " +
-          jdbcURI + ". " + e.getMessage();
-      throw new SQLException(msg, " 08S01", e);
-    }
-
+    httpClient = getHttpClient(useSsl);
     try {
       transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
+      // We'll call an open/close here to send a test HTTP message to the server. Any
+      // TTransportException caused by trying to connect to a non-available peer are thrown here.
+      // Bubbling them up the call hierarchy so that a retry can happen in openTransport,
+      // if dynamic service discovery is configured.
+      TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+      TOpenSessionResp openResp = client.OpenSession(new TOpenSessionReq());
+      if (openResp != null) {
+        client.CloseSession(new TCloseSessionReq(openResp.getSessionHandle()));
+      }
     }
-    catch (TTransportException e) {
+    catch (TException e) {
       String msg =  "Could not create http connection to " +
-          jdbcURI + ". " + e.getMessage();
-      throw new SQLException(msg, " 08S01", e);
+          jdbcUriString + ". " + e.getMessage();
+      throw new TTransportException(msg, e);
     }
     return transport;
   }
@@ -263,7 +269,7 @@ public class HiveConnection implements j
        * for sending to the server before every request.
        */
       requestInterceptor = new HttpKerberosRequestInterceptor(
-          sessConfMap.get(HIVE_AUTH_PRINCIPAL), host, getServerHttpUrl(false));
+          sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host, getServerHttpUrl(false));
     }
     else {
       /**
@@ -273,11 +279,23 @@ public class HiveConnection implements j
       requestInterceptor = new HttpBasicAuthInterceptor(getUserName(), getPassword());
       // Configure httpClient for SSL
       if (useSsl) {
-        String sslTrustStorePath = sessConfMap.get(HIVE_SSL_TRUST_STORE);
+        String sslTrustStorePath = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE);
         String sslTrustStorePassword = sessConfMap.get(
-            HIVE_SSL_TRUST_STORE_PASSWORD);
+            JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
         KeyStore sslTrustStore;
         SSLSocketFactory socketFactory;
+        /**
+         * The code within the try block throws:
+         * 1. SSLInitializationException
+         * 2. KeyStoreException
+         * 3. IOException
+         * 4. NoSuchAlgorithmException
+         * 5. CertificateException
+         * 6. KeyManagementException
+         * 7. UnrecoverableKeyException
+         * We don't want the client to retry on any of these, hence we catch all
+         * and throw a SQLException.
+         */
         try {
           if (sslTrustStorePath == null || sslTrustStorePath.isEmpty()) {
             // Create a default socket factory based on standard JSSE trust material
@@ -285,7 +303,7 @@ public class HiveConnection implements j
           }
           else {
             // Pick trust store config from the given path
-            sslTrustStore = KeyStore.getInstance(HIVE_SSL_TRUST_STORE_TYPE);
+            sslTrustStore = KeyStore.getInstance(JdbcConnectionParams.SSL_TRUST_STORE_TYPE);
             sslTrustStore.load(new FileInputStream(sslTrustStorePath),
                 sslTrustStorePassword.toCharArray());
             socketFactory = new SSLSocketFactory(sslTrustStore);
@@ -296,7 +314,7 @@ public class HiveConnection implements j
         }
         catch (Exception e) {
           String msg =  "Could not create an https connection to " +
-              jdbcURI + ". " + e.getMessage();
+              jdbcUriString + ". " + e.getMessage();
           throw new SQLException(msg, " 08S01", e);
         }
       }
@@ -316,29 +334,32 @@ public class HiveConnection implements j
    *   - Raw (non-SASL) socket
    *
    *   Kerberos and Delegation token supports SASL QOP configurations
+   * @throws SQLException, TTransportException
    */
-  private TTransport createBinaryTransport() throws SQLException {
+  private TTransport createBinaryTransport() throws SQLException, TTransportException {
     try {
       // handle secure connection if specified
-      if (!HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE))) {
+      if (!JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))) {
         // If Kerberos
         Map<String, String> saslProps = new HashMap<String, String>();
         SaslQOP saslQOP = SaslQOP.AUTH;
-        if (sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL)) {
-          if (sessConfMap.containsKey(HIVE_AUTH_QOP)) {
+        if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL)) {
+          if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_QOP)) {
             try {
-              saslQOP = SaslQOP.fromString(sessConfMap.get(HIVE_AUTH_QOP));
+              saslQOP = SaslQOP.fromString(sessConfMap.get(JdbcConnectionParams.AUTH_QOP));
             } catch (IllegalArgumentException e) {
-              throw new SQLException("Invalid " + HIVE_AUTH_QOP +
+              throw new SQLException("Invalid " + JdbcConnectionParams.AUTH_QOP +
                   " parameter. " + e.getMessage(), "42000", e);
             }
           }
           saslProps.put(Sasl.QOP, saslQOP.toString());
           saslProps.put(Sasl.SERVER_AUTH, "true");
-          boolean assumeSubject = HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap.get(HIVE_AUTH_KERBEROS_AUTH_TYPE));
+          boolean assumeSubject = JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
+              .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
           transport = KerberosSaslHelper.getKerberosTransport(
-              sessConfMap.get(HIVE_AUTH_PRINCIPAL), host,
-              HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps, assumeSubject);
+              sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host,
+              HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps,
+              assumeSubject);
         } else {
           // If there's a delegation token available then use token based connection
           String tokenStr = getClientDelegationToken(sessConfMap);
@@ -349,10 +370,15 @@ public class HiveConnection implements j
             // we are using PLAIN Sasl connection with user/password
             String userName = getUserName();
             String passwd = getPassword();
+            // Note: Thrift returns an SSL socket that is already bound to the specified host:port
+            // Therefore an open called on this would be a no-op later
+            // Hence, any TTransportException related to connecting with the peer are thrown here.
+            // Bubbling them up the call hierarchy so that a retry can happen in openTransport,
+            // if dynamic service discovery is configured.
             if (isSslConnection()) {
               // get SSL socket
-              String sslTrustStore = sessConfMap.get(HIVE_SSL_TRUST_STORE);
-              String sslTrustStorePassword = sessConfMap.get(HIVE_SSL_TRUST_STORE_PASSWORD);
+              String sslTrustStore = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE);
+              String sslTrustStorePassword = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
               if (sslTrustStore == null || sslTrustStore.isEmpty()) {
                 transport = HiveAuthFactory.getSSLSocket(host, port, loginTimeout);
               } else {
@@ -373,10 +399,7 @@ public class HiveConnection implements j
       }
     } catch (SaslException e) {
       throw new SQLException("Could not create secure connection to "
-          + jdbcURI + ": " + e.getMessage(), " 08S01", e);
-    } catch (TTransportException e) {
-      throw new SQLException("Could not create connection to "
-          + jdbcURI + ": " + e.getMessage(), " 08S01", e);
+          + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
     }
     return transport;
   }
@@ -385,7 +408,7 @@ public class HiveConnection implements j
   private String getClientDelegationToken(Map<String, String> jdbcConnConf)
       throws SQLException {
     String tokenStr = null;
-    if (HIVE_AUTH_TOKEN.equalsIgnoreCase(jdbcConnConf.get(HIVE_AUTH_TYPE))) {
+    if (JdbcConnectionParams.AUTH_TOKEN.equalsIgnoreCase(jdbcConnConf.get(JdbcConnectionParams.AUTH_TYPE))) {
       // check delegation token in job conf if any
       try {
         tokenStr = ShimLoader.getHadoopShims().
@@ -397,7 +420,7 @@ public class HiveConnection implements j
     return tokenStr;
   }
 
-  private void openSession(Utils.JdbcConnectionParams connParams) throws SQLException {
+  private void openSession() throws SQLException {
     TOpenSessionReq openReq = new TOpenSessionReq();
 
     Map<String, String> openConf = new HashMap<String, String>();
@@ -433,7 +456,7 @@ public class HiveConnection implements j
     } catch (TException e) {
       LOG.error("Error opening session", e);
       throw new SQLException("Could not establish connection to "
-          + jdbcURI + ": " + e.getMessage(), " 08S01", e);
+          + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
     }
     isClosed = false;
   }
@@ -442,27 +465,27 @@ public class HiveConnection implements j
    * @return username from sessConfMap
    */
   private String getUserName() {
-    return getSessionValue(HIVE_AUTH_USER, HIVE_ANONYMOUS_USER);
+    return getSessionValue(JdbcConnectionParams.AUTH_USER, JdbcConnectionParams.ANONYMOUS_USER);
   }
 
   /**
    * @return password from sessConfMap
    */
   private String getPassword() {
-    return getSessionValue(HIVE_AUTH_PASSWD, HIVE_ANONYMOUS_PASSWD);
+    return getSessionValue(JdbcConnectionParams.AUTH_PASSWD, JdbcConnectionParams.ANONYMOUS_PASSWD);
   }
 
   private boolean isSslConnection() {
-    return "true".equalsIgnoreCase(sessConfMap.get(HIVE_USE_SSL));
+    return "true".equalsIgnoreCase(sessConfMap.get(JdbcConnectionParams.USE_SSL));
   }
 
   private boolean isKerberosAuthMode() {
-    return !HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE))
-        && sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL);
+    return !JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))
+        && sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL);
   }
 
   private boolean isHttpTransportMode() {
-    String transportMode = hiveConfMap.get(HIVE_SERVER2_TRANSPORT_MODE);
+    String transportMode = hiveConfMap.get(JdbcConnectionParams.TRANSPORT_MODE);
     if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
       return true;
     }

Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java Tue Sep 16 19:40:51 2014
@@ -230,7 +230,12 @@ public class HiveDriver implements Drive
       throw new SQLException("Invalid connection url: " + url);
     }
 
-    JdbcConnectionParams params = Utils.parseURL(url);
+    JdbcConnectionParams params = null;
+    try {
+      params = Utils.parseURL(url);
+    } catch (ZooKeeperHiveClientException e) {
+      throw new SQLException(e);
+    }
     String host = params.getHost();
     if (host == null){
       host = "";
@@ -239,7 +244,7 @@ public class HiveDriver implements Drive
     if(host.equals("")){
       port = "";
     }
-    else if(port.equals("0")){
+    else if(port.equals("0") || port.equals("-1")){
       port = Utils.DEFAULT_PORT;
     }
     String db = params.getDbName();

Added: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java?rev=1625363&view=auto
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java (added)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java Tue Sep 16 19:40:51 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.sql.SQLException;
+
+public class JdbcUriParseException extends SQLException {
+
+  private static final long serialVersionUID = 0;
+
+  /**
+   * @param cause (original exception)
+   */
+  public JdbcUriParseException(Throwable cause) {
+    super(cause);
+  }
+
+  /**
+   * @param msg (exception message)
+   */
+  public JdbcUriParseException(String msg) {
+    super(msg);
+  }
+
+  /**
+   * @param msg (exception message)
+   * @param cause (original exception)
+   */
+  public JdbcUriParseException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+}

Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java Tue Sep 16 19:40:51 2014
@@ -19,17 +19,23 @@
 package org.apache.hive.jdbc;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.thrift.TStatus;
 import org.apache.hive.service.cli.thrift.TStatusCode;
 
 public class Utils {
+  public static final Log LOG = LogFactory.getLog(Utils.class.getName());
   /**
     * The required prefix for the connection URL.
     */
@@ -47,14 +53,58 @@ public class Utils {
 
   private static final String URI_JDBC_PREFIX = "jdbc:";
 
+  private static final String URI_HIVE_PREFIX = "hive2:";
+
   public static class JdbcConnectionParams {
+    // Note on client side parameter naming convention:
+    // Prefer using a shorter camelCase param name instead of using the same name as the
+    // corresponding
+    // HiveServer2 config.
+    // For a jdbc url: jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list,
+    // client side params are specified in sess_var_list
+
+    // Client param names:
+    static final String AUTH_TYPE = "auth";
+    static final String AUTH_QOP = "sasl.qop";
+    static final String AUTH_SIMPLE = "noSasl";
+    static final String AUTH_TOKEN = "delegationToken";
+    static final String AUTH_USER = "user";
+    static final String AUTH_PRINCIPAL = "principal";
+    static final String AUTH_PASSWD = "password";
+    static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
+    static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
+    static final String ANONYMOUS_USER = "anonymous";
+    static final String ANONYMOUS_PASSWD = "anonymous";
+    static final String USE_SSL = "ssl";
+    static final String SSL_TRUST_STORE = "sslTrustStore";
+    static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
+    static final String TRANSPORT_MODE = "hive.server2.transport.mode";
+    static final String HTTP_PATH = "hive.server2.thrift.http.path";
+    static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
+    // Don't use dynamic serice discovery
+    static final String SERVICE_DISCOVERY_MODE_NONE = "none";
+    // Use ZooKeeper for indirection while using dynamic service discovery
+    static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
+    static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
+
+    // Non-configurable params:
+    // ZOOKEEPER_SESSION_TIMEOUT is not exposed as client configurable
+    static final int ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000;
+    // Currently supports JKS keystore format
+    static final String SSL_TRUST_STORE_TYPE = "JKS";
+
     private String host = null;
     private int port;
+    private String jdbcUriString;
     private String dbName = DEFAULT_DATABASE;
     private Map<String,String> hiveConfs = new LinkedHashMap<String,String>();
     private Map<String,String> hiveVars = new LinkedHashMap<String,String>();
     private Map<String,String> sessionVars = new LinkedHashMap<String,String>();
     private boolean isEmbeddedMode = false;
+    private String[] authorityList;
+    private String zooKeeperEnsemble = null;
+    private String currentHostZnodePath;
+    private List<String> rejectedHostZnodePaths = new ArrayList<String>();
 
     public JdbcConnectionParams() {
     }
@@ -62,46 +112,94 @@ public class Utils {
     public String getHost() {
       return host;
     }
+
     public int getPort() {
       return port;
     }
+
+    public String getJdbcUriString() {
+      return jdbcUriString;
+    }
+
     public String getDbName() {
       return dbName;
     }
+
     public Map<String, String> getHiveConfs() {
       return hiveConfs;
     }
-    public Map<String,String> getHiveVars() {
+
+    public Map<String, String> getHiveVars() {
       return hiveVars;
     }
+
     public boolean isEmbeddedMode() {
       return isEmbeddedMode;
     }
+
     public Map<String, String> getSessionVars() {
       return sessionVars;
     }
 
+    public String[] getAuthorityList() {
+      return authorityList;
+    }
+
+    public String getZooKeeperEnsemble() {
+      return zooKeeperEnsemble;
+    }
+
+    public List<String> getRejectedHostZnodePaths() {
+      return rejectedHostZnodePaths;
+    }
+
+    public String getCurrentHostZnodePath() {
+      return currentHostZnodePath;
+    }
+
     public void setHost(String host) {
       this.host = host;
     }
+
     public void setPort(int port) {
       this.port = port;
     }
+
+    public void setJdbcUriString(String jdbcUriString) {
+      this.jdbcUriString = jdbcUriString;
+    }
+
     public void setDbName(String dbName) {
       this.dbName = dbName;
     }
+
     public void setHiveConfs(Map<String, String> hiveConfs) {
       this.hiveConfs = hiveConfs;
     }
-    public void setHiveVars(Map<String,String> hiveVars) {
+
+    public void setHiveVars(Map<String, String> hiveVars) {
       this.hiveVars = hiveVars;
     }
+
     public void setEmbeddedMode(boolean embeddedMode) {
       this.isEmbeddedMode = embeddedMode;
     }
+
     public void setSessionVars(Map<String, String> sessionVars) {
       this.sessionVars = sessionVars;
     }
+
+    public void setSuppliedAuthorityList(String[] authorityList) {
+      this.authorityList = authorityList;
+    }
+
+    public void setZooKeeperEnsemble(String zooKeeperEnsemble) {
+      this.zooKeeperEnsemble = zooKeeperEnsemble;
+    }
+
+    public void setCurrentHostZnodePath(String currentHostZnodePath) {
+      this.currentHostZnodePath = currentHostZnodePath;
+    }
   }
 
   // Verify success or success_with_info status, else throw SQLException
@@ -124,27 +222,33 @@ public class Utils {
 
   /**
    * Parse JDBC connection URL
-   * The new format of the URL is jdbc:hive2://<host>:<port>/dbName;sess_var_list?hive_conf_list#hive_var_list
-   * where the optional sess, conf and var lists are semicolon separated <key>=<val> pairs. As before, if the
-   * host/port is not specified, it the driver runs an embedded hive.
+   * The new format of the URL is:
+   * jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;sess_var_list?hive_conf_list#hive_var_list
+   * where the optional sess, conf and var lists are semicolon separated <key>=<val> pairs.
+   * For utilizing dynamic service discovery with HiveServer2 multiple comma separated host:port pairs can
+   * be specified as shown above.
+   * The JDBC driver resolves the list of uris and picks a specific server instance to connect to.
+   * Currently, dynamic service discovery using ZooKeeper is supported, in which case the host:port pairs represent a ZooKeeper ensemble.
+   *
+   * As before, if the host/port is not specified, it the driver runs an embedded hive.
    * examples -
    *  jdbc:hive2://ubuntu:11000/db2?hive.cli.conf.printheader=true;hive.exec.mode.local.auto.inputbytes.max=9999#stab=salesTable;icol=customerID
    *  jdbc:hive2://?hive.cli.conf.printheader=true;hive.exec.mode.local.auto.inputbytes.max=9999#stab=salesTable;icol=customerID
    *  jdbc:hive2://ubuntu:11000/db2;user=foo;password=bar
    *
    *  Connect to http://server:10001/hs2, with specified basicAuth credentials and initial database:
-   *     jdbc:hive2://server:10001/db;user=foo;password=bar?hive.server2.transport.mode=http;hive.server2.thrift.http.path=hs2
-   *
-   * Note that currently the session properties are not used.
+   *  jdbc:hive2://server:10001/db;user=foo;password=bar?hive.server2.transport.mode=http;hive.server2.thrift.http.path=hs2
    *
    * @param uri
    * @return
+   * @throws SQLException
    */
-  public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentException {
+  public static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException,
+      SQLException, ZooKeeperHiveClientException {
     JdbcConnectionParams connParams = new JdbcConnectionParams();
 
     if (!uri.startsWith(URL_PREFIX)) {
-      throw new IllegalArgumentException("Bad URL format: Missing prefix " + URL_PREFIX);
+      throw new JdbcUriParseException("Bad URL format: Missing prefix " + URL_PREFIX);
     }
 
     // For URLs with no other configuration
@@ -154,29 +258,28 @@ public class Utils {
       return connParams;
     }
 
-    URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length()));
-
-    // Check to prevent unintentional use of embedded mode. A missing "/"
-    // to separate the 'path' portion of URI can result in this.
-    // The missing "/" common typo while using secure mode, eg of such url -
-    // jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM
-    if((jdbcURI.getAuthority() != null) && (jdbcURI.getHost()==null)) {
-       throw new IllegalArgumentException("Bad URL format. Hostname not found "
-           + " in authority part of the url: " + jdbcURI.getAuthority()
-           + ". Are you missing a '/' after the hostname ?");
-    }
-
-    connParams.setHost(jdbcURI.getHost());
-    if (connParams.getHost() == null) {
+    // The JDBC URI now supports specifying multiple host:port if dynamic service discovery is
+    // configured on HiveServer2 (like: host1:port1,host2:port2,host3:port3)
+    // We'll extract the authorities (host:port combo) from the URI, extract session vars, hive
+    // confs & hive vars by parsing it as a Java URI.
+    // To parse the intermediate URI as a Java URI, we'll give a dummy authority(dummy:00000).
+    // Later, we'll substitute the dummy authority for a resolved authority.
+    String dummyAuthorityString = "dummyhost:00000";
+    String suppliedAuthorities = getAuthorities(uri, connParams);
+    if ((suppliedAuthorities == null) || (suppliedAuthorities.isEmpty())) {
+      // Given uri of the form:
+      // jdbc:hive2:///dbName;sess_var_list?hive_conf_list#hive_var_list
       connParams.setEmbeddedMode(true);
     } else {
-      int port = jdbcURI.getPort();
-      if (port == -1) {
-        port = Integer.valueOf(DEFAULT_PORT);
-      }
-      connParams.setPort(port);
+      LOG.info("Supplied authorities: " + suppliedAuthorities);
+      String[] authorityList = suppliedAuthorities.split(",");
+      connParams.setSuppliedAuthorityList(authorityList);
+      uri = uri.replace(suppliedAuthorities, dummyAuthorityString);
     }
 
+    // Now parse the connection uri with dummy authority
+    URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length()));
+
     // key=value pattern
     Pattern pattern = Pattern.compile("([^;]*)=([^;]*)[;]?");
 
@@ -192,12 +295,13 @@ public class Utils {
       } else {
         // we have dbname followed by session parameters
         dbName = sessVars.substring(0, sessVars.indexOf(';'));
-        sessVars = sessVars.substring(sessVars.indexOf(';')+1);
+        sessVars = sessVars.substring(sessVars.indexOf(';') + 1);
         if (sessVars != null) {
           Matcher sessMatcher = pattern.matcher(sessVars);
           while (sessMatcher.find()) {
             if (connParams.getSessionVars().put(sessMatcher.group(1), sessMatcher.group(2)) != null) {
-              throw new IllegalArgumentException("Bad URL format: Multiple values for property " + sessMatcher.group(1));
+              throw new JdbcUriParseException("Bad URL format: Multiple values for property "
+                  + sessMatcher.group(1));
             }
           }
         }
@@ -225,10 +329,146 @@ public class Utils {
       }
     }
 
+    // Extract host, port
+    if (connParams.isEmbeddedMode()) {
+      // In case of embedded mode we were supplied with an empty authority.
+      // So we never substituted the authority with a dummy one.
+      connParams.setHost(jdbcURI.getHost());
+      connParams.setPort(jdbcURI.getPort());
+    } else {
+      // Else substitute the dummy authority with a resolved one.
+      // In case of dynamic service discovery using ZooKeeper, it picks a server uri from ZooKeeper
+      String resolvedAuthorityString = resolveAuthority(connParams);
+      uri = uri.replace(dummyAuthorityString, resolvedAuthorityString);
+      connParams.setJdbcUriString(uri);
+      // Create a Java URI from the resolved URI for extracting the host/port
+      URI resolvedAuthorityURI = null;
+      try {
+        resolvedAuthorityURI = new URI(null, resolvedAuthorityString, null, null, null);
+      } catch (URISyntaxException e) {
+        throw new JdbcUriParseException("Bad URL format: ", e);
+      }
+      connParams.setHost(resolvedAuthorityURI.getHost());
+      connParams.setPort(resolvedAuthorityURI.getPort());
+    }
+
     return connParams;
   }
 
   /**
+   * Get the authority string from the supplied uri, which could potentially contain multiple
+   * host:port pairs.
+   *
+   * @param uri
+   * @param connParams
+   * @return
+   * @throws JdbcUriParseException
+   */
+  private static String getAuthorities(String uri, JdbcConnectionParams connParams)
+      throws JdbcUriParseException {
+    String authorities;
+    // For a jdbc uri like: jdbc:hive2://host1:port1,host2:port2,host3:port3/
+    // Extract the uri host:port list starting after "jdbc:hive2://", till the 1st "/" or EOL
+    int fromIndex = Utils.URL_PREFIX.length();
+    int toIndex = uri.indexOf("/", fromIndex);
+    if (toIndex < 0) {
+      authorities = uri.substring(fromIndex);
+    } else {
+      authorities = uri.substring(fromIndex, uri.indexOf("/", fromIndex));
+    }
+    return authorities;
+  }
+
+  /**
+   * Get a string representing a specific host:port
+   * @param connParams
+   * @return
+   * @throws JdbcUriParseException
+   * @throws ZooKeeperHiveClientException
+   */
+  private static String resolveAuthority(JdbcConnectionParams connParams)
+      throws JdbcUriParseException, ZooKeeperHiveClientException {
+    String serviceDiscoveryMode =
+        connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
+    if ((serviceDiscoveryMode != null)
+        && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER
+            .equalsIgnoreCase(serviceDiscoveryMode))) {
+      // Resolve using ZooKeeper
+      return resolveAuthorityUsingZooKeeper(connParams);
+    } else {
+      String authority = connParams.getAuthorityList()[0];
+      URI jdbcURI = URI.create(URI_HIVE_PREFIX + "//" + authority);
+      // Check to prevent unintentional use of embedded mode. A missing "/"
+      // to separate the 'path' portion of URI can result in this.
+      // The missing "/" common typo while using secure mode, eg of such url -
+      // jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM
+      if ((jdbcURI.getAuthority() != null) && (jdbcURI.getHost() == null)) {
+        throw new JdbcUriParseException("Bad URL format. Hostname not found "
+            + " in authority part of the url: " + jdbcURI.getAuthority()
+            + ". Are you missing a '/' after the hostname ?");
+      }
+      // Return the 1st element of the array
+      return jdbcURI.getAuthority();
+    }
+  }
+
+  /**
+   * Read a specific host:port from ZooKeeper
+   * @param connParams
+   * @return
+   * @throws ZooKeeperHiveClientException
+   */
+  private static String resolveAuthorityUsingZooKeeper(JdbcConnectionParams connParams)
+      throws ZooKeeperHiveClientException {
+    // Set ZooKeeper ensemble in connParams for later use
+    connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
+    return ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
+  }
+
+  /**
+   * Read the next server coordinates (host:port combo) from ZooKeeper. Ignore the znodes already
+   * explored. Also update the host, port, jdbcUriString fields of connParams.
+   *
+   * @param connParams
+   * @throws ZooKeeperHiveClientException
+   */
+  static void updateConnParamsFromZooKeeper(JdbcConnectionParams connParams)
+      throws ZooKeeperHiveClientException {
+    // Add current host to the rejected list
+    connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath());
+    // Get another HiveServer2 uri from ZooKeeper
+    String serverUriString = ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
+    // Parse serverUri to a java URI and extract host, port
+    URI serverUri = null;
+    try {
+      // Note URL_PREFIX is not a valid scheme format, therefore leaving it null in the constructor
+      // to construct a valid URI
+      serverUri = new URI(null, serverUriString, null, null, null);
+    } catch (URISyntaxException e) {
+      throw new ZooKeeperHiveClientException(e);
+    }
+    String oldServerHost = connParams.getHost();
+    int oldServerPort = connParams.getPort();
+    String newServerHost = serverUri.getHost();
+    int newServerPort = serverUri.getPort();
+    connParams.setHost(newServerHost);
+    connParams.setPort(newServerPort);
+    connParams.setJdbcUriString(connParams.getJdbcUriString().replace(
+        oldServerHost + ":" + oldServerPort, newServerHost + ":" + newServerPort));
+  }
+
+  private static String joinStringArray(String[] stringArray, String seperator) {
+    StringBuilder stringBuilder = new StringBuilder();
+    for (int cur = 0, end = stringArray.length; cur < end; cur++) {
+      if (cur > 0) {
+        stringBuilder.append(seperator);
+      }
+      stringBuilder.append(stringArray[cur]);
+    }
+    return stringBuilder.toString();
+  }
+
+  /**
    * Takes a version string delimited by '.' and '-' characters
    * and returns a partial version.
    *

Added: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java?rev=1625363&view=auto
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java (added)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java Tue Sep 16 19:40:51 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.jdbc;
+
+public class ZooKeeperHiveClientException extends Exception {
+
+  private static final long serialVersionUID = 0;
+
+  /**
+   * @param cause (original exception)
+   */
+  public ZooKeeperHiveClientException(Throwable cause) {
+    super(cause);
+  }
+
+  /**
+   * @param msg (exception message)
+   */
+  public ZooKeeperHiveClientException(String msg) {
+    super(msg);
+  }
+
+  /**
+   * @param msg (exception message)
+   * @param cause (original exception)
+   */
+  public ZooKeeperHiveClientException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+}

Added: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java?rev=1625363&view=auto
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java (added)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java Tue Sep 16 19:40:51 2014
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class ZooKeeperHiveClientHelper {
+  public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
+
+  /**
+   * A no-op watcher class
+   */
+  public static class DummyWatcher implements Watcher {
+    public void process(org.apache.zookeeper.WatchedEvent event) {
+    }
+  }
+
+  /**
+   * Resolve to a host:port by connecting to ZooKeeper and picking a host randomly.
+   *
+   * @param uri
+   * @param connParams
+   * @return
+   * @throws SQLException
+   */
+  static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams)
+      throws ZooKeeperHiveClientException {
+    String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
+    String zooKeeperNamespace =
+        connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE);
+    List<String> serverHosts;
+    Random randomizer = new Random();
+    String serverNode;
+    // Pick a random HiveServer2 host from the ZooKeeper namspace
+    try {
+      ZooKeeper zooKeeperClient =
+          new ZooKeeper(zooKeeperEnsemble, JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT,
+              new ZooKeeperHiveClientHelper.DummyWatcher());
+      // All the HiveServer2 host nodes that are in ZooKeeper currently
+      serverHosts = zooKeeperClient.getChildren("/" + zooKeeperNamespace, false);
+      // Remove the znodes we've already tried from this list
+      serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
+      if (serverHosts.isEmpty()) {
+        throw new ZooKeeperHiveClientException(
+            "Tried all existing HiveServer2 uris from ZooKeeper.");
+      }
+      // Now pick a host randomly
+      serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
+      connParams.setCurrentHostZnodePath(serverNode);
+      // Read the value from the node (UTF-8 enoded byte array) and convert it to a String
+      String serverUri =
+          new String(zooKeeperClient.getData("/" + zooKeeperNamespace + "/" + serverNode, false,
+              null), Charset.forName("UTF-8"));
+      LOG.info("Selected HiveServer2 instance with uri: " + serverUri);
+      return serverUri;
+    } catch (Exception e) {
+      throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e);
+    }
+  }
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Tue Sep 16 19:40:51 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.lockmgr
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.metadata.*;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.KeeperException;
@@ -73,31 +74,6 @@ public class ZooKeeperHiveLockManager im
   }
 
   /**
-   * @param conf  The hive configuration
-   * Get the quorum server address from the configuration. The format is:
-   * host1:port, host2:port..
-   **/
-  @VisibleForTesting
-  static String getQuorumServers(HiveConf conf) {
-    String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(",");
-    String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT);
-    StringBuilder quorum = new StringBuilder();
-    for(int i=0; i<hosts.length; i++) {
-      quorum.append(hosts[i].trim());
-      if (!hosts[i].contains(":")) {
-        // if the hostname doesn't contain a port, add the configured port to hostname
-        quorum.append(":");
-        quorum.append(port);
-      }
-
-      if (i != hosts.length-1)
-        quorum.append(",");
-    }
-
-    return quorum.toString();
-  }
-
-  /**
    * @param ctx  The lock manager context (containing the Hive configuration file)
    * Start the ZooKeeper client based on the zookeeper cluster specified in the conf.
    **/
@@ -105,7 +81,7 @@ public class ZooKeeperHiveLockManager im
     this.ctx = ctx;
     HiveConf conf = ctx.getConf();
     sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
-    quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf);
+    quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
 
     sleepTime = conf.getTimeVar(
         HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
@@ -146,7 +122,7 @@ public class ZooKeeperHiveLockManager im
       return;
     }
 
-    zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher());
+    zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new ZooKeeperHiveHelper.DummyWatcher());
   }
 
   /**
@@ -517,8 +493,8 @@ public class ZooKeeperHiveLockManager im
     ZooKeeper zkpClient = null;
     try {
       int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
-      String quorumServers = getQuorumServers(conf);
-      Watcher dummyWatcher = new DummyWatcher();
+      String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
+      Watcher dummyWatcher = new ZooKeeperHiveHelper.DummyWatcher();
       zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher);
       String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
       List<HiveLock> locks = getLocks(conf, zkpClient, null, parent, false, false);
@@ -629,7 +605,8 @@ public class ZooKeeperHiveLockManager im
 
         if (fetchData) {
           try {
-            data = new HiveLockObjectData(new String(zkpClient.getData(curChild, new DummyWatcher(), null)));
+            data = new HiveLockObjectData(new String(zkpClient.getData(curChild,
+                new ZooKeeperHiveHelper.DummyWatcher(), null)));
             data.setClientIp(clientIp);
           } catch (Exception e) {
             LOG.error("Error in getting data for " + curChild, e);
@@ -789,11 +766,6 @@ public class ZooKeeperHiveLockManager im
     return null;
   }
 
-  public static class DummyWatcher implements Watcher {
-    public void process(org.apache.zookeeper.WatchedEvent event)  {
-    }
-  }
-
   @Override
   public void prepareRetry() throws LockException {
     try {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java?rev=1625363&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java Tue Sep 16 19:40:51 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+public class ZooKeeperHiveHelper {
+  public static final Log LOG = LogFactory.getLog(ZooKeeperHiveHelper.class.getName());
+  public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
+  /**
+   * Get the ensemble server addresses from the configuration. The format is: host1:port,
+   * host2:port..
+   *
+   * @param conf
+   **/
+  public static String getQuorumServers(HiveConf conf) {
+    String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(",");
+    String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT);
+    StringBuilder quorum = new StringBuilder();
+    for (int i = 0; i < hosts.length; i++) {
+      quorum.append(hosts[i].trim());
+      if (!hosts[i].contains(":")) {
+        // if the hostname doesn't contain a port, add the configured port to hostname
+        quorum.append(":");
+        quorum.append(port);
+      }
+
+      if (i != hosts.length - 1)
+        quorum.append(",");
+    }
+
+    return quorum.toString();
+  }
+
+
+  /**
+   * Create a path on ZooKeeper, if it does not already exist ("mkdir -p")
+   *
+   * @param zooKeeperClient ZooKeeper session
+   * @param path string with ZOOKEEPER_PATH_SEPARATOR as the separator
+   * @param acl list of ACL entries
+   * @param createMode for create mode of each node in the patch
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public static String createPathRecursively(ZooKeeper zooKeeperClient, String path, List<ACL> acl,
+      CreateMode createMode) throws KeeperException, InterruptedException {
+    String[] pathComponents = StringUtils.splitByWholeSeparator(path, ZOOKEEPER_PATH_SEPARATOR);
+    String currentPath = "";
+    for (String pathComponent : pathComponents) {
+      currentPath += ZOOKEEPER_PATH_SEPARATOR + pathComponent;
+      try {
+        String node = zooKeeperClient.create(currentPath, new byte[0], acl, createMode);
+        LOG.info("Created path: " + node);
+      } catch (KeeperException.NodeExistsException e) {
+        // Do nothing here
+      }
+    }
+    return currentPath;
+  }
+
+  /**
+   * A no-op watcher class
+   */
+  public static class DummyWatcher implements Watcher {
+    public void process(org.apache.zookeeper.WatchedEvent event) {
+    }
+  }
+
+}

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java Tue Sep 16 19:40:51 2014
@@ -25,6 +25,7 @@ import java.util.Collections;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.Assert;
@@ -87,14 +88,14 @@ public class TestZookeeperLockManager {
   public void testGetQuorumServers() {
     conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1");
     conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
-    Assert.assertEquals("node1:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+    Assert.assertEquals("node1:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
 
     conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1,node2,node3");
     conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
-    Assert.assertEquals("node1:9999,node2:9999,node3:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+    Assert.assertEquals("node1:9999,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
 
     conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1:5666,node2,node3");
     conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
-    Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+    Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
   }
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Tue Sep 16 19:40:51 2014
@@ -66,7 +66,7 @@ public class CLIService extends Composit
   private UserGroupInformation httpUGI;
 
   public CLIService() {
-    super("CLIService");
+    super(CLIService.class.getSimpleName());
   }
 
   @Override
@@ -201,8 +201,7 @@ public class CLIService extends Composit
    * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle)
    */
   @Override
-  public void closeSession(SessionHandle sessionHandle)
-      throws HiveSQLException {
+  public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
     sessionManager.closeSession(sessionHandle);
     LOG.debug(sessionHandle + ": closeSession()");
   }
@@ -470,4 +469,8 @@ public class CLIService extends Composit
     sessionManager.getSession(sessionHandle).renewDelegationToken(authFactory, tokenStr);
     LOG.info(sessionHandle  + ": renewDelegationToken()");
   }
+
+  public SessionManager getSessionManager() {
+    return sessionManager;
+  }
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Sep 16 19:40:51 2014
@@ -47,7 +47,7 @@ public class OperationManager extends Ab
       new HashMap<OperationHandle, Operation>();
 
   public OperationManager() {
-    super("OperationManager");
+    super(OperationManager.class.getSimpleName());
   }
 
   @Override

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Sep 16 19:40:51 2014
@@ -166,8 +166,8 @@ public class HiveSessionImpl implements 
     IHiveFileProcessor processor = new GlobalHivercFileProcessor();
 
     try {
-      if (hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION) != null) {
-        String hiverc = hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION)
+      if (hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION) != null) {
+        String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION)
             + File.separator + SessionManager.HIVERCFILE;
         if (new File(hiverc).exists()) {
           LOG.info("Running global init file: " + hiverc);

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Sep 16 19:40:51 2014
@@ -67,7 +67,7 @@ public class SessionManager extends Comp
   private volatile boolean shutdown;
 
   public SessionManager() {
-    super("SessionManager");
+    super(SessionManager.class.getSimpleName());
   }
 
   @Override
@@ -356,5 +356,9 @@ public class SessionManager extends Comp
     return backgroundOperationPool.submit(r);
   }
 
+  public int getOpenSessionCount() {
+    return handleToSession.size();
+  }
+
 }
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Tue Sep 16 19:40:51 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hive.service.cli.thrift;
 
-import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -40,72 +39,54 @@ import org.apache.thrift.transport.TTran
 public class ThriftBinaryCLIService extends ThriftCLIService {
 
   public ThriftBinaryCLIService(CLIService cliService) {
-    super(cliService, "ThriftBinaryCLIService");
+    super(cliService, ThriftBinaryCLIService.class.getSimpleName());
   }
 
   @Override
   public void run() {
     try {
-      hiveAuthFactory = new HiveAuthFactory(hiveConf);
-      TTransportFactory  transportFactory = hiveAuthFactory.getAuthTransFactory();
-      TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
-
-      String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
-      if (portString != null) {
-        portNum = Integer.valueOf(portString);
-      } else {
-        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
-      }
-
-      String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
-      if (hiveHost == null) {
-        hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
-      }
-
-      if (hiveHost != null && !hiveHost.isEmpty()) {
-        serverAddress = new InetSocketAddress(hiveHost, portNum);
-      } else {
-        serverAddress = new  InetSocketAddress(portNum);
-      }
-
-      minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
-      maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
-      workerKeepAliveTime = hiveConf.getTimeVar(
-          ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+      // Server thread pool
       String threadPoolName = "HiveServer2-Handler-Pool";
       ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
           workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
           new ThreadFactoryWithGarbageCleanup(threadPoolName));
 
+      // Thrift configs
+      hiveAuthFactory = new HiveAuthFactory(hiveConf);
+      TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
+      TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
       TServerSocket serverSocket = null;
       if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
         serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
       } else {
         String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
         if (keyStorePath.isEmpty()) {
-          throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname +
-              " Not configured for SSL connection");
+          throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+              + " Not configured for SSL connection");
         }
         String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
             HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
-        serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum,
-            keyStorePath, keyStorePassword);
+        serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
+            keyStorePassword);
       }
+
+      // Server args
       TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
-      .processorFactory(processorFactory)
-      .transportFactory(transportFactory)
-      .protocolFactory(new TBinaryProtocol.Factory())
-      .executorService(executorService);
+          .processorFactory(processorFactory).transportFactory(transportFactory)
+          .protocolFactory(new TBinaryProtocol.Factory()).executorService(executorService);
 
+      // TCP Server
       server = new TThreadPoolServer(sargs);
-
-      LOG.info("ThriftBinaryCLIService listening on " + serverAddress);
-
       server.serve();
-
+      String msg = "Started " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+          + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
+      LOG.info(msg);
     } catch (Throwable t) {
-      LOG.error("Error: ", t);
+      LOG.fatal(
+          "Error starting HiveServer2: could not start "
+              + ThriftBinaryCLIService.class.getSimpleName(), t);
+      System.exit(-1);
     }
-
   }
+
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Sep 16 19:40:51 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.LoginException;
 
@@ -34,6 +35,7 @@ import org.apache.hive.service.auth.Hive
 import org.apache.hive.service.auth.TSetIpAddressProcessor;
 import org.apache.hive.service.cli.*;
 import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.server.HiveServer2;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TServer;
 
@@ -48,9 +50,11 @@ public abstract class ThriftCLIService e
   protected CLIService cliService;
   private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
   private static final TStatus ERROR_STATUS = new TStatus(TStatusCode.ERROR_STATUS);
+  protected static HiveAuthFactory hiveAuthFactory;
 
   protected int portNum;
   protected InetSocketAddress serverAddress;
+  protected String hiveHost;
   protected TServer server;
   protected org.eclipse.jetty.server.Server httpServer;
 
@@ -62,8 +66,7 @@ public abstract class ThriftCLIService e
   protected int minWorkerThreads;
   protected int maxWorkerThreads;
   protected long workerKeepAliveTime;
-
-  protected static HiveAuthFactory hiveAuthFactory;
+  private HiveServer2 hiveServer2;
 
   public ThriftCLIService(CLIService cliService, String serviceName) {
     super(serviceName);
@@ -73,6 +76,43 @@ public abstract class ThriftCLIService e
   @Override
   public synchronized void init(HiveConf hiveConf) {
     this.hiveConf = hiveConf;
+
+    // Initialize common server configs needed in both binary & http modes
+    String portString;
+    hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+    if (hiveHost == null) {
+      hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+    }
+    // HTTP mode
+    if (HiveServer2.isHTTPTransportMode(hiveConf)) {
+      workerKeepAliveTime =
+          hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
+              TimeUnit.SECONDS);
+      portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
+      if (portString != null) {
+        portNum = Integer.valueOf(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
+      }
+    }
+    // Binary mode
+    else {
+      workerKeepAliveTime =
+          hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+      portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+      if (portString != null) {
+        portNum = Integer.valueOf(portString);
+      } else {
+        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+      }
+    }
+    if (hiveHost != null && !hiveHost.isEmpty()) {
+      serverAddress = new InetSocketAddress(hiveHost, portNum);
+    } else {
+      serverAddress = new InetSocketAddress(portNum);
+    }
+    minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+    maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
     super.init(hiveConf);
   }
 
@@ -105,6 +145,14 @@ public abstract class ThriftCLIService e
     super.stop();
   }
 
+  public int getPortNumber() {
+    return portNum;
+  }
+
+  public InetSocketAddress getServerAddress() {
+    return serverAddress;
+  }
+
   @Override
   public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
       throws TException {
@@ -308,6 +356,24 @@ public abstract class ThriftCLIService e
     } catch (Exception e) {
       LOG.warn("Error closing session: ", e);
       resp.setStatus(HiveSQLException.toTStatus(e));
+    } finally {
+      if (!(isEmbedded) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY))
+          && (!hiveServer2.isRegisteredWithZooKeeper())) {
+        // Asynchronously shutdown this instance of HiveServer2,
+        // if there are no active client sessions
+        if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+          LOG.info("This instance of HiveServer2 has been removed from the list of server "
+              + "instances available for dynamic service discovery. "
+              + "The last client session has ended - will shutdown now.");
+          Thread shutdownThread = new Thread() {
+            @Override
+            public void run() {
+              hiveServer2.stop();
+            }
+          };
+          shutdownThread.start();
+        }
+      }
     }
     return resp;
   }
@@ -591,5 +657,9 @@ public abstract class ThriftCLIService e
         .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
   }
 
+  public void setHiveServer2(HiveServer2 hiveServer2) {
+    this.hiveServer2 = hiveServer2;
+  }
+
 }