You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/30 22:36:53 UTC

svn commit: r1635621 - in /hive/branches/branch-0.14: ./ jdbc/ jdbc/src/java/org/apache/hive/jdbc/ ql/src/java/org/apache/hadoop/hive/ql/util/ service/ service/src/java/org/apache/hive/service/server/

Author: gunther
Date: Thu Oct 30 21:36:53 2014
New Revision: 1635621

URL: http://svn.apache.org/r1635621
Log:
HIVE-8664: Use Apache Curator in JDBC Driver and HiveServer2 for better reliability (Vaibhav Gumashta, reviewed by Thejas M Nair)

Modified:
    hive/branches/branch-0.14/jdbc/pom.xml
    hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/Utils.java
    hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
    hive/branches/branch-0.14/pom.xml
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
    hive/branches/branch-0.14/service/pom.xml
    hive/branches/branch-0.14/service/src/java/org/apache/hive/service/server/HiveServer2.java

Modified: hive/branches/branch-0.14/jdbc/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/jdbc/pom.xml?rev=1635621&r1=1635620&r2=1635621&view=diff
==============================================================================
--- hive/branches/branch-0.14/jdbc/pom.xml (original)
+++ hive/branches/branch-0.14/jdbc/pom.xml Thu Oct 30 21:36:53 2014
@@ -97,6 +97,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
   </dependencies>
 
   <profiles>

Modified: hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/Utils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/Utils.java?rev=1635621&r1=1635620&r2=1635621&view=diff
==============================================================================
--- hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/Utils.java (original)
+++ hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/Utils.java Thu Oct 30 21:36:53 2014
@@ -102,6 +102,7 @@ public class Utils {
     // Non-configurable params:
     // ZOOKEEPER_SESSION_TIMEOUT is not exposed as client configurable
     static final int ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000;
+    static final int ZOOKEEPER_CONNECTION_TIMEOUT = -1;
     // Currently supports JKS keystore format
     static final String SSL_TRUST_STORE_TYPE = "JKS";
 

Modified: hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java?rev=1635621&r1=1635620&r2=1635621&view=diff
==============================================================================
--- hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java (original)
+++ hive/branches/branch-0.14/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java Thu Oct 30 21:36:53 2014
@@ -25,9 +25,11 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
 public class ZooKeeperHiveClientHelper {
   public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
@@ -59,14 +61,14 @@ public class ZooKeeperHiveClientHelper {
     List<String> serverHosts;
     Random randomizer = new Random();
     String serverNode;
-    ZooKeeper zooKeeperClient = null;
-    // Pick a random HiveServer2 host from the ZooKeeper namspace
+    CuratorFramework zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .sessionTimeoutMs(JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT)
+            .connectionTimeoutMs(JdbcConnectionParams.ZOOKEEPER_CONNECTION_TIMEOUT)
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+    zooKeeperClient.start();
     try {
-      zooKeeperClient =
-          new ZooKeeper(zooKeeperEnsemble, JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT,
-              new ZooKeeperHiveClientHelper.DummyWatcher());
-      // All the HiveServer2 host nodes that are in ZooKeeper currently
-      serverHosts = zooKeeperClient.getChildren("/" + zooKeeperNamespace, false);
+      serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace);
       // Remove the znodes we've already tried from this list
       serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
       if (serverHosts.isEmpty()) {
@@ -76,22 +78,18 @@ public class ZooKeeperHiveClientHelper {
       // Now pick a host randomly
       serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
       connParams.setCurrentHostZnodePath(serverNode);
-      // Read the value from the node (UTF-8 enoded byte array) and convert it to a String
       String serverUri =
-          new String(zooKeeperClient.getData("/" + zooKeeperNamespace + "/" + serverNode, false,
-              null), Charset.forName("UTF-8"));
+          new String(
+              zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode),
+              Charset.forName("UTF-8"));
       LOG.info("Selected HiveServer2 instance with uri: " + serverUri);
       return serverUri;
     } catch (Exception e) {
       throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e);
     } finally {
-      // Try to close the client connection with ZooKeeper
+      // Close the client connection with ZooKeeper
       if (zooKeeperClient != null) {
-        try {
-          zooKeeperClient.close();
-        } catch (Exception e) {
-          // No-op
-        }
+        zooKeeperClient.close();
       }
     }
   }

Modified: hive/branches/branch-0.14/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/pom.xml?rev=1635621&r1=1635620&r2=1635621&view=diff
==============================================================================
--- hive/branches/branch-0.14/pom.xml (original)
+++ hive/branches/branch-0.14/pom.xml Thu Oct 30 21:36:53 2014
@@ -473,13 +473,12 @@
           </exclusion>
         </exclusions>
       </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
-
-        <dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-framework</artifactId>
+        <version>${curator.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.groovy</groupId>
         <artifactId>groovy-all</artifactId>
         <version>${groovy.version}</version>

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java?rev=1635621&r1=1635620&r2=1635621&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java Thu Oct 30 21:36:53 2014
@@ -18,17 +18,10 @@
 
 package org.apache.hadoop.hive.ql.util;
 
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
 
 public class ZooKeeperHiveHelper {
   public static final Log LOG = LogFactory.getLog(ZooKeeperHiveHelper.class.getName());
@@ -59,34 +52,6 @@ public class ZooKeeperHiveHelper {
     return quorum.toString();
   }
 
-
-  /**
-   * Create a path on ZooKeeper, if it does not already exist ("mkdir -p")
-   *
-   * @param zooKeeperClient ZooKeeper session
-   * @param path string with ZOOKEEPER_PATH_SEPARATOR as the separator
-   * @param acl list of ACL entries
-   * @param createMode for create mode of each node in the patch
-   * @return
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  public static String createPathRecursively(ZooKeeper zooKeeperClient, String path, List<ACL> acl,
-      CreateMode createMode) throws KeeperException, InterruptedException {
-    String[] pathComponents = StringUtils.splitByWholeSeparator(path, ZOOKEEPER_PATH_SEPARATOR);
-    String currentPath = "";
-    for (String pathComponent : pathComponents) {
-      currentPath += ZOOKEEPER_PATH_SEPARATOR + pathComponent;
-      try {
-        String node = zooKeeperClient.create(currentPath, new byte[0], acl, createMode);
-        LOG.info("Created path: " + node);
-      } catch (KeeperException.NodeExistsException e) {
-        // Do nothing here
-      }
-    }
-    return currentPath;
-  }
-
   /**
    * A no-op watcher class
    */
@@ -95,5 +60,4 @@ public class ZooKeeperHiveHelper {
     public void process(org.apache.zookeeper.WatchedEvent event) {
     }
   }
-
 }

Modified: hive/branches/branch-0.14/service/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/service/pom.xml?rev=1635621&r1=1635620&r2=1635621&view=diff
==============================================================================
--- hive/branches/branch-0.14/service/pom.xml (original)
+++ hive/branches/branch-0.14/service/pom.xml Thu Oct 30 21:36:53 2014
@@ -86,6 +86,11 @@
       <artifactId>libthrift</artifactId>
       <version>${libthrift.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
     <!-- intra-project -->
     <dependency>
       <groupId>org.apache.hive</groupId>

Modified: hive/branches/branch-0.14/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1635621&r1=1635620&r2=1635621&view=diff
==============================================================================
--- hive/branches/branch-0.14/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/branches/branch-0.14/service/src/java/org/apache/hive/service/server/HiveServer2.java Thu Oct 30 21:36:53 2014
@@ -32,6 +32,10 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -52,7 +56,6 @@ import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 /**
@@ -65,7 +68,7 @@ public class HiveServer2 extends Composi
   private CLIService cliService;
   private ThriftCLIService thriftCLIService;
   private String znodePath;
-  private ZooKeeper zooKeeperClient;
+  private CuratorFramework zooKeeperClient;
   private boolean registeredWithZooKeeper = false;
 
   public HiveServer2() {
@@ -73,7 +76,6 @@ public class HiveServer2 extends Composi
     HiveConf.setLoadHiveServer2Config(true);
   }
 
-
   @Override
   public synchronized void init(HiveConf hiveConf) {
     cliService = new CLIService(this);
@@ -108,6 +110,33 @@ public class HiveServer2 extends Composi
   }
 
   /**
+   * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
+   */
+  private final ACLProvider zooKeeperAclProvider = new ACLProvider() {
+    List<ACL> nodeAcls = new ArrayList<ACL>();
+
+    @Override
+    public List<ACL> getDefaultAcl() {
+      if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+        // Read all to the world
+        nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
+        // Create/Delete/Write/Admin to the authenticated user
+        nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
+      } else {
+        // ACLs for znodes on a non-kerberized cluster
+        // Create/Read/Delete/Write/Admin to the world
+        nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
+      }
+      return nodeAcls;
+    }
+
+    @Override
+    public List<ACL> getAclForPath(String path) {
+      return getDefaultAcl();
+    }
+  };
+
+  /**
    * Adds a server instance to ZooKeeper as a znode.
    *
    * @param hiveConf
@@ -116,28 +145,29 @@ public class HiveServer2 extends Composi
   private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
     int zooKeeperSessionTimeout =
         hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+    int connectTimeoutMillis = -1;
     String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
     String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
     String instanceURI = getServerInstanceURI(hiveConf);
     byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
-    // Znode ACLs
-    List<ACL> nodeAcls = new ArrayList<ACL>();
-    setUpAuthAndAcls(hiveConf, nodeAcls);
-    // Create a ZooKeeper client
+    setUpZooKeeperAuth(hiveConf);
+    // Create a CuratorFramework instance to be used as the ZooKeeper client
+    // Use the zooKeeperAclProvider to create appropriate ACLs
     zooKeeperClient =
-        new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
-            new ZooKeeperHiveHelper.DummyWatcher());
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .sessionTimeoutMs(zooKeeperSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
+            .aclProvider(zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(1000, 3))
+            .build();
+    zooKeeperClient.start();
     // Create the parent znodes recursively; ignore if the parent already exists.
-    // If pre-creating the parent on a kerberized cluster, ensure that you give ACLs,
-    // as explained in {@link #setUpAuthAndAcls(HiveConf, List<ACL>) setUpAuthAndAcls}
     try {
-      ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls,
-          CreateMode.PERSISTENT);
+      zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+          .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
       LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
     } catch (KeeperException e) {
       if (e.code() != KeeperException.Code.NODEEXISTS) {
         LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
-        throw (e);
+        throw e;
       }
     }
     // Create a znode under the rootNamespace parent for this instance of the server
@@ -148,56 +178,40 @@ public class HiveServer2 extends Composi
               + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
               + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
       znodePath =
-          zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls,
-              CreateMode.EPHEMERAL_SEQUENTIAL);
+          zooKeeperClient.create().creatingParentsIfNeeded()
+              .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8);
       setRegisteredWithZooKeeper(true);
       // Set a watch on the znode
-      if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) {
+      if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
         // No node exists, throw exception
         throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
       }
       LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
     } catch (KeeperException e) {
       LOG.fatal("Unable to create a znode for this server instance", e);
-      throw new Exception(e);
+      throw (e);
     }
   }
 
   /**
-   * Set up ACLs for znodes based on whether the cluster is secure or not.
-   * On a kerberized cluster, ZooKeeper performs Kerberos-SASL authentication.
-   * We give Read privilege to the world, but Create/Delete/Write/Admin to the authenticated user.
-   * On a non-kerberized cluster, we give Create/Read/Delete/Write/Admin privileges to the world.
+   * For a kerberized cluster, we dynamically set up the client's JAAS conf.
    *
-   * For a kerberized cluster, we also dynamically set up the client's JAAS conf.
    * @param hiveConf
-   * @param nodeAcls
    * @return
    * @throws Exception
    */
-  private void setUpAuthAndAcls(HiveConf hiveConf, List<ACL> nodeAcls) throws Exception {
+  private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
     if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
       String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
       if (principal.isEmpty()) {
-        throw new IOException(
-            "HiveServer2 Kerberos principal is empty");
+        throw new IOException("HiveServer2 Kerberos principal is empty");
       }
       String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
       if (keyTabFile.isEmpty()) {
-        throw new IOException(
-            "HiveServer2 Kerberos keytab is empty");
+        throw new IOException("HiveServer2 Kerberos keytab is empty");
       }
-
       // Install the JAAS Configuration for the runtime
       ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
-      // Read all to the world
-      nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
-      // Create/Delete/Write/Admin to the authenticated user
-      nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
-    } else {
-      // ACLs for znodes on a non-kerberized cluster
-      // Create/Read/Delete/Write/Admin to the world
-      nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
     }
   }
 
@@ -333,22 +347,27 @@ public class HiveServer2 extends Composi
     HiveConf hiveConf = new HiveConf();
     int zooKeeperSessionTimeout =
         hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+    int connectTimeoutMillis = -1;
     String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
     String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
-    ZooKeeper zooKeeperClient =
-        new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
-            new ZooKeeperHiveHelper.DummyWatcher());
-    // Get all znode paths
+    CuratorFramework zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .sessionTimeoutMs(zooKeeperSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+    zooKeeperClient.start();
     List<String> znodePaths =
-        zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace,
-            false);
+        zooKeeperClient.getChildren().forPath(
+            ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
     // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
     for (String znodePath : znodePaths) {
       if (znodePath.contains("version=" + versionNumber + ";")) {
-        zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
-            + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1);
+        LOG.info("Removing the znode: " + znodePath + " from ZooKeeper");
+        zooKeeperClient.delete().forPath(
+            ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+                + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath);
       }
     }
+    zooKeeperClient.close();
   }
 
   public static void main(String[] args) {
@@ -503,8 +522,8 @@ public class HiveServer2 extends Composi
   }
 
   /**
-   * DeregisterOptionExecutor: executes the --deregister option by
-   * deregistering all HiveServer2 instances from ZooKeeper of a specific version.
+   * DeregisterOptionExecutor: executes the --deregister option by deregistering all HiveServer2
+   * instances from ZooKeeper of a specific version.
    */
   static class DeregisterOptionExecutor implements ServerOptionsExecutor {
     private final String versionNumber;
@@ -526,4 +545,3 @@ public class HiveServer2 extends Composi
     }
   }
 }
-