You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/12 07:08:02 UTC

svn commit: r1651036 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ itests/util/src/main/java/org/apache/hadoop/hive/ql/ ql/ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/

Author: xuefu
Date: Mon Jan 12 06:08:02 2015
New Revision: 1651036

URL: http://svn.apache.org/r1651036
Log:
HIVE-9119: ZooKeeperHiveLockManager does not use zookeeper in the proper way (Na via Xuefu)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/trunk/ql/pom.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jan 12 06:08:02 2015
@@ -1323,13 +1323,20 @@ public class HiveConf extends Configurat
         "The port of ZooKeeper servers to talk to.\n" +
         "If the list of Zookeeper servers specified in hive.zookeeper.quorum\n" +
         "does not contain port numbers, this value is used."),
-    HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000,
-        "ZooKeeper client's session timeout. The client is disconnected, and as a result, all locks released, \n" +
+    HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", "600000ms",
+        new TimeValidator(TimeUnit.MILLISECONDS),
+        "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" +
         "if a heartbeat is not sent in the timeout."),
     HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace",
         "The parent node under which all ZooKeeper nodes are created."),
     HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false,
         "Clean extra nodes at the end of the session."),
+    HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES("hive.zookeeper.connection.max.retries", 3,
+        "Max number of times to retry when connecting to the ZooKeeper server."),
+    HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME("hive.zookeeper.connection.basesleeptime", "1000ms",
+        new TimeValidator(TimeUnit.MILLISECONDS),
+        "Initial amount of time (in milliseconds) to wait between retries\n" +
+        "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
 
     // Transactions
     HIVE_TXN_MANAGER("hive.txn.manager",

Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Mon Jan 12 06:08:02 2015
@@ -49,6 +49,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -75,6 +76,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton;
 import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -124,7 +126,7 @@ public class QTestUtil {
   private static MiniClusterType clusterType = MiniClusterType.none;
   private ParseDriver pd;
   protected Hive db;
-  protected HiveConf conf;
+  static protected HiveConf conf;
   private Driver drv;
   private BaseSemanticAnalyzer sem;
   protected final boolean overWrite;
@@ -1467,7 +1469,7 @@ public class QTestUtil {
         zooKeeper.close();
       }
 
-      int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+      int sessionTimeout =  (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
       zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() {
         @Override
         public void process(WatchedEvent arg0) {
@@ -1492,6 +1494,8 @@ public class QTestUtil {
     }
 
     public void tearDown() throws Exception {
+      CuratorFrameworkSingleton.closeAndReleaseInstance();
+
       if (zooKeeperCluster != null) {
         zooKeeperCluster.shutdown();
         zooKeeperCluster = null;

Modified: hive/trunk/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/pom.xml?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/trunk/ql/pom.xml Mon Jan 12 06:08:02 2015
@@ -173,6 +173,23 @@
       <version>${zookeeper.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
+    <dependency>
+    <groupId>org.apache.curator</groupId>
+      <artifactId>apache-curator</artifactId>
+      <version>${curator.version}</version>
+      <type>pom</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>${curator.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.codehaus.groovy</groupId>
       <artifactId>groovy-all</artifactId>
       <version>${groovy.version}</version>

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java?rev=1651036&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java Mon Jan 12 06:08:02 2015
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
+
+public class CuratorFrameworkSingleton {
+  private static HiveConf conf = null;
+  private static CuratorFramework sharedClient = null;
+  static final Log LOG = LogFactory.getLog("CuratorFrameworkSingleton");
+  static {
+    // Add shutdown hook.
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        closeAndReleaseInstance();
+      }
+    });
+  }
+
+  public static synchronized CuratorFramework getInstance(HiveConf hiveConf) {
+    if (sharedClient == null) {
+      // Create a client instance
+      if (hiveConf == null) {
+        conf = new HiveConf();
+      } else {
+        conf = hiveConf;
+      }
+      int sessionTimeout =  (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+      int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+      int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+      String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
+
+      sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers)
+          .sessionTimeoutMs(sessionTimeout)
+          .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+          .build();
+      sharedClient.start();
+    }
+
+    return sharedClient;
+  }
+
+  public static synchronized void closeAndReleaseInstance() {
+    if (sharedClient != null) {
+      sharedClient.close();
+      sharedClient = null;
+      String shutdownMsg = "Closing ZooKeeper client.";
+      LOG.info(shutdownMsg);
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Mon Jan 12 06:08:02 2015
@@ -27,15 +27,10 @@ import org.apache.hadoop.hive.ql.lockmgr
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.metadata.*;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
@@ -47,14 +42,11 @@ public class ZooKeeperHiveLockManager im
   public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager");
   static final private LogHelper console = new LogHelper(LOG);
 
-  private ZooKeeper zooKeeper;
+  private static CuratorFramework curatorFramework;
 
   // All the locks are created under this parent
   private String    parent;
 
-  private int sessionTimeout;
-  private String quorumServers;
-
   private long sleepTime;
   private int numRetriesForLock;
   private int numRetriesForUnLock;
@@ -80,8 +72,6 @@ public class ZooKeeperHiveLockManager im
   public void setContext(HiveLockManagerCtx ctx) throws LockException {
     this.ctx = ctx;
     HiveConf conf = ctx.getConf();
-    sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
-    quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
 
     sleepTime = conf.getTimeVar(
         HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
@@ -89,20 +79,18 @@ public class ZooKeeperHiveLockManager im
     numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
 
     try {
-      renewZookeeperInstance(sessionTimeout, quorumServers);
+      curatorFramework = CuratorFrameworkSingleton.getInstance(conf);
       parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
-
-      try {
-        zooKeeper.create("/" +  parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      } catch (KeeperException e) {
+      try{
+        curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/" +  parent, new byte[0]);
+      } catch (Exception e) {
         // ignore if the parent already exists
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
+        if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) {
           LOG.warn("Unexpected ZK exception when creating parent node /" + parent, e);
         }
       }
-
     } catch (Exception e) {
-      LOG.error("Failed to create ZooKeeper object: ", e);
+      LOG.error("Failed to create curatorFramework object: ", e);
       throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
     }
   }
@@ -116,15 +104,6 @@ public class ZooKeeperHiveLockManager im
     numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
   }
 
-  private void renewZookeeperInstance(int sessionTimeout, String quorumServers)
-      throws InterruptedException, IOException {
-    if (zooKeeper != null) {
-      return;
-    }
-
-    zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new ZooKeeperHiveHelper.DummyWatcher());
-  }
-
   /**
    * @param key    object to be locked
    * Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1,
@@ -266,8 +245,8 @@ public class ZooKeeperHiveLockManager im
    * @throws InterruptedException
    **/
   private String createChild(String name, byte[] data, CreateMode mode)
-      throws KeeperException, InterruptedException {
-    return zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode);
+      throws Exception {
+    return curatorFramework.create().withMode(mode).forPath(name, data);
   }
 
   private String getLockName(String parent, HiveLockMode mode) {
@@ -347,7 +326,7 @@ public class ZooKeeperHiveLockManager im
   private ZooKeeperHiveLock lockPrimitive(HiveLockObject key,
       HiveLockMode mode, boolean keepAlive, boolean parentCreated,
       Set<String> conflictingLocks)
-      throws KeeperException, InterruptedException {
+      throws Exception {
     String res;
 
     // If the parents have already been created, create the last child only
@@ -369,8 +348,8 @@ public class ZooKeeperHiveLockManager im
     for (String name : names) {
       try {
         res = createChild(name, new byte[0], CreateMode.PERSISTENT);
-      } catch (KeeperException e) {
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
+      } catch (Exception e) {
+        if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) {
           //if the exception is not 'NODEEXISTS', re-throw it
           throw e;
         }
@@ -383,11 +362,11 @@ public class ZooKeeperHiveLockManager im
 
     int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
     if (seqNo == -1) {
-      zooKeeper.delete(res, -1);
+      curatorFramework.delete().forPath(res);
       return null;
     }
 
-    List<String> children = zooKeeper.getChildren(lastName, false);
+    List<String> children = curatorFramework.getChildren().forPath(lastName);
 
     String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE);
     String shLock = getLockName(lastName, HiveLockMode.SHARED);
@@ -407,12 +386,11 @@ public class ZooKeeperHiveLockManager im
 
       if ((childSeq >= 0) && (childSeq < seqNo)) {
         try {
-          zooKeeper.delete(res, -1);
+          curatorFramework.delete().forPath(res);
         } finally {
           if (LOG.isDebugEnabled()) {
-            Stat stat = new Stat();
             try {
-              String data = new String(zooKeeper.getData(child, false, stat));
+              String data = new String(curatorFramework.getData().forPath(child));
               conflictingLocks.add(data);
             } catch (Exception e) {
               //ignored
@@ -428,11 +406,10 @@ public class ZooKeeperHiveLockManager im
 
   /* Remove the lock specified */
   public void unlock(HiveLock hiveLock) throws LockException {
-    unlockWithRetry(ctx.getConf(), zooKeeper, hiveLock, parent);
+    unlockWithRetry(hiveLock, parent);
   }
 
-  private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient,
-      HiveLock hiveLock, String parent) throws LockException {
+  private void unlockWithRetry(HiveLock hiveLock, String parent) throws LockException {
 
     int tryNum = 0;
     do {
@@ -440,14 +417,13 @@ public class ZooKeeperHiveLockManager im
         tryNum++;
         if (tryNum > 1) {
           Thread.sleep(sleepTime);
-          prepareRetry();
         }
-        unlockPrimitive(conf, zkpClient, hiveLock, parent);
+        unlockPrimitive(hiveLock, parent, curatorFramework);
         break;
       } catch (Exception e) {
         if (tryNum >= numRetriesForUnLock) {
           String name = ((ZooKeeperHiveLock)hiveLock).getPath();
-          LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.");  
+          LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.");
           throw new LockException(e);
         }
       }
@@ -458,21 +434,20 @@ public class ZooKeeperHiveLockManager im
 
   /* Remove the lock specified */
   @VisibleForTesting
-  static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient,
-                             HiveLock hiveLock, String parent) throws LockException {
+  static void unlockPrimitive(HiveLock hiveLock, String parent, CuratorFramework curatorFramework) throws LockException {
     ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock;
     HiveLockObject obj = zLock.getHiveLockObject();
     String name  = getLastObjectName(parent, obj);
     try {
-      zkpClient.delete(zLock.getPath(), -1);
+      curatorFramework.delete().forPath(zLock.getPath());
 
       // Delete the parent node if all the children have been deleted
-      List<String> children = zkpClient.getChildren(name, false);
+      List<String> children = curatorFramework.getChildren().forPath(name);
       if (children == null || children.isEmpty()) {
-        zkpClient.delete(name, -1);
+        curatorFramework.delete().forPath(name);
       }
     } catch (KeeperException.NoNodeException nne) {
-      //can happen in retrying deleting the zLock after exceptions like InterruptedException 
+      //can happen in retrying deleting the zLock after exceptions like InterruptedException
       //or in a race condition where parent has already been deleted by other process when it
       //is to be deleted. Both cases should not raise error
       LOG.debug("Node " + zLock.getPath() + " or its parent has already been deleted.");
@@ -480,7 +455,7 @@ public class ZooKeeperHiveLockManager im
       //can happen in a race condition where another process adds a zLock under this parent
       //just before it is about to be deleted. It should not be a problem since this parent
       //can eventually be deleted by the process which hold its last child zLock
-      LOG.debug("Node " + name + " to be deleted is not empty.");  
+      LOG.debug("Node " + name + " to be deleted is not empty.");
     } catch (Exception e) {
       //exceptions including InterruptException and other KeeperException
       LOG.error("Failed to release ZooKeeper lock: ", e);
@@ -490,19 +465,14 @@ public class ZooKeeperHiveLockManager im
 
   /* Release all locks - including PERSISTENT locks */
   public static void releaseAllLocks(HiveConf conf) throws Exception {
-    ZooKeeper zkpClient = null;
     try {
-      int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
-      String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
-      Watcher dummyWatcher = new ZooKeeperHiveHelper.DummyWatcher();
-      zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher);
       String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
-      List<HiveLock> locks = getLocks(conf, zkpClient, null, parent, false, false);
+      List<HiveLock> locks = getLocks(conf, null, parent, false, false);
       Exception lastExceptionGot = null;
       if (locks != null) {
         for (HiveLock lock : locks) {
           try {
-            unlockPrimitive(conf, zkpClient, lock, parent);
+            unlockPrimitive(lock, parent, curatorFramework);
           } catch (Exception e) {
             lastExceptionGot = e;
           }
@@ -516,24 +486,19 @@ public class ZooKeeperHiveLockManager im
     } catch (Exception e) {
       LOG.error("Failed to release all locks: ", e);
       throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
-    } finally {
-      if (zkpClient != null) {
-        zkpClient.close();
-        zkpClient = null;
-      }
     }
   }
 
   /* Get all locks */
   public List<HiveLock> getLocks(boolean verifyTablePartition, boolean fetchData)
     throws LockException {
-    return getLocks(ctx.getConf(), zooKeeper, null, parent, verifyTablePartition, fetchData);
+    return getLocks(ctx.getConf(), null, parent, verifyTablePartition, fetchData);
   }
 
   /* Get all locks for a particular object */
   public List<HiveLock> getLocks(HiveLockObject key, boolean verifyTablePartitions,
                                  boolean fetchData) throws LockException {
-    return getLocks(ctx.getConf(), zooKeeper, key, parent, verifyTablePartitions, fetchData);
+    return getLocks(ctx.getConf(), key, parent, verifyTablePartitions, fetchData);
   }
 
   /**
@@ -541,7 +506,7 @@ public class ZooKeeperHiveLockManager im
    * @param zkpClient   The ZooKeeper client
    * @param key         The object to be compared against - if key is null, then get all locks
    **/
-  private static List<HiveLock> getLocks(HiveConf conf, ZooKeeper zkpClient,
+  private static List<HiveLock> getLocks(HiveConf conf,
       HiveLockObject key, String parent, boolean verifyTablePartition, boolean fetchData)
       throws LockException {
     List<HiveLock> locks = new ArrayList<HiveLock>();
@@ -552,12 +517,12 @@ public class ZooKeeperHiveLockManager im
     try {
       if (key != null) {
         commonParent = "/" + parent + "/" + key.getName();
-        children = zkpClient.getChildren(commonParent, false);
+        children = curatorFramework.getChildren().forPath(commonParent);
         recurse = false;
       }
       else {
         commonParent = "/" + parent;
-        children = zkpClient.getChildren(commonParent, false);
+        children = curatorFramework.getChildren().forPath(commonParent);
       }
     } catch (Exception e) {
       // no locks present
@@ -579,7 +544,7 @@ public class ZooKeeperHiveLockManager im
 
       if (recurse) {
         try {
-          children = zkpClient.getChildren(curChild, false);
+          children = curatorFramework.getChildren().forPath(curChild);
           for (String child : children) {
             childn.add(curChild + "/" + child);
           }
@@ -588,7 +553,7 @@ public class ZooKeeperHiveLockManager im
         }
       }
 
-      HiveLockMode mode = getLockMode(conf, curChild);
+      HiveLockMode mode = getLockMode(curChild);
       if (mode == null) {
         continue;
       }
@@ -605,8 +570,7 @@ public class ZooKeeperHiveLockManager im
 
         if (fetchData) {
           try {
-            data = new HiveLockObjectData(new String(zkpClient.getData(curChild,
-                new ZooKeeperHiveHelper.DummyWatcher(), null)));
+            data = new HiveLockObjectData(new String(curatorFramework.getData().watched().forPath(curChild)));
             data.setClientIp(clientIp);
           } catch (Exception e) {
             LOG.error("Error in getting data for " + curChild, e);
@@ -623,12 +587,7 @@ public class ZooKeeperHiveLockManager im
   /** Remove all redundant nodes **/
   private void removeAllRedundantNodes() {
     try {
-      renewZookeeperInstance(sessionTimeout, quorumServers);
       checkRedundantNode("/" + parent);
-      if (zooKeeper != null) {
-        zooKeeper.close();
-        zooKeeper = null;
-      }
     } catch (Exception e) {
       LOG.warn("Exception while removing all redundant nodes", e);
     }
@@ -637,19 +596,19 @@ public class ZooKeeperHiveLockManager im
   private void checkRedundantNode(String node) {
     try {
       // Nothing to do if it is a lock mode
-      if (getLockMode(ctx.getConf(), node) != null) {
+      if (getLockMode(node) != null) {
         return;
       }
 
-      List<String> children = zooKeeper.getChildren(node, false);
+      List<String> children = curatorFramework.getChildren().forPath(node);
       for (String child : children) {
         checkRedundantNode(node + "/" + child);
       }
 
-      children = zooKeeper.getChildren(node, false);
+      children = curatorFramework.getChildren().forPath(node);
       if ((children == null) || (children.isEmpty()))
       {
-        zooKeeper.delete(node, -1);
+        curatorFramework.delete().forPath(node);
       }
     } catch (Exception e) {
       LOG.warn("Error in checkRedundantNode for node " + node, e);
@@ -658,12 +617,7 @@ public class ZooKeeperHiveLockManager im
 
   /* Release all transient locks, by simply closing the client */
   public void close() throws LockException {
-    try {
-
-      if (zooKeeper != null) {
-        zooKeeper.close();
-        zooKeeper = null;
-      }
+  try {
 
       if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) {
         removeAllRedundantNodes();
@@ -750,7 +704,7 @@ public class ZooKeeperHiveLockManager im
   private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$");
 
   /* Get the mode of the lock encoded in the path */
-  private static HiveLockMode getLockMode(HiveConf conf, String path) {
+  private static HiveLockMode getLockMode(String path) {
 
     Matcher shMatcher = shMode.matcher(path);
     Matcher exMatcher = exMode.matcher(path);
@@ -768,15 +722,6 @@ public class ZooKeeperHiveLockManager im
 
   @Override
   public void prepareRetry() throws LockException {
-    try {
-      if (zooKeeper != null && zooKeeper.getState() == ZooKeeper.States.CLOSED) {
-        // Reconnect if the connection is closed.
-        zooKeeper = null;
-      }
-      renewZookeeperInstance(sessionTimeout, quorumServers);
-    } catch (Exception e) {
-      throw new LockException(e);
-    }
   }
 
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java Mon Jan 12 06:08:02 2015
@@ -18,70 +18,82 @@
 
 package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
 
-import static org.mockito.Mockito.*;
-
-import java.util.Collections;
-
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.After;
 import org.junit.Test;
 
-import com.google.common.base.Joiner;
-
 public class TestZookeeperLockManager {
 
-  private static final Joiner SLASH = Joiner.on("/");
-  private static final String PARENT = "hive";
-  private static final String TABLE = "t1";
-  private static final String PARENT_LOCK_PATH = SLASH.join("", PARENT, TABLE);
-  private static final String TABLE_LOCK_PATH =  SLASH.join("", PARENT, TABLE, "00001");
   private HiveConf conf;
-  private ZooKeeper zooKeeper;
+  private TestingServer server;
+  private CuratorFramework client;
   private HiveLockObject hiveLock;
   private ZooKeeperHiveLock zLock;
+  private HiveLockObjectData lockObjData;
+  private static final String PARENT = "hive";
+  private static final String TABLE = "t1";
+  private static final String PARENT_LOCK_PATH = "/hive/t1";
+  private static final String TABLE_LOCK_PATH = "/hive/t1/00001";
 
   @Before
   public void setup() {
     conf = new HiveConf();
-    zooKeeper = mock(ZooKeeper.class);
-    hiveLock = mock(HiveLockObject.class);
-    when(hiveLock.getName()).thenReturn(TABLE);
+    lockObjData = new HiveLockObjectData("1", "10", "SHARED", "show tables");
+    hiveLock = new HiveLockObject(TABLE, lockObjData);
     zLock = new ZooKeeperHiveLock(TABLE_LOCK_PATH, hiveLock, HiveLockMode.SHARED);
-  }
 
-  @Test
-  public void testDeleteNoChildren() throws Exception {
-    ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
-    verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
-    verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
-    verify(zooKeeper).delete(PARENT_LOCK_PATH, -1);
-    verifyNoMoreInteractions(zooKeeper);
+    while (server == null)
+    {
+      try {
+        server = new TestingServer();
+        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+        client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build();
+        client.start();
+      } catch (Exception e) {
+        System.err.println("Getting bind exception - retrying to allocate server");
+        server = null;
+      }
+    }
   }
-  /**
-   * Tests two threads racing to delete PARENT_LOCK_PATH
-   */
-  @Test
-  public void testDeleteNoChildrenNodeDoesNotExist() throws Exception {
-    doThrow(new KeeperException.NoNodeException()).when(zooKeeper).delete(PARENT_LOCK_PATH, -1);
-    ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
-    verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
-    verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
-    verify(zooKeeper).delete(PARENT_LOCK_PATH, -1);
-    verifyNoMoreInteractions(zooKeeper);
+
+  @After
+  public void teardown() throws Exception
+  {
+    client.close();
+    server.close();
+    server = null;
   }
+
   @Test
-  public void testDeleteWithChildren() throws Exception {
-    when(zooKeeper.getChildren(PARENT_LOCK_PATH, false)).thenReturn(Collections.singletonList("somechild"));
-    ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
-    verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
-    verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
-    verifyNoMoreInteractions(zooKeeper);
+  public void testDeleteNoChildren() throws Exception
+  {
+    client.create().creatingParentsIfNeeded().forPath(TABLE_LOCK_PATH, lockObjData.toString().getBytes());
+    byte[] data = client.getData().forPath(TABLE_LOCK_PATH);
+    Assert.assertArrayEquals(lockObjData.toString().getBytes(), data);
+    ZooKeeperHiveLockManager.unlockPrimitive(zLock, PARENT, client);
+    try {
+      data = client.getData().forPath(TABLE_LOCK_PATH);
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals( e instanceof KeeperException.NoNodeException, true);
+    }
+    try {
+      data = client.getData().forPath(PARENT_LOCK_PATH);
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals( e instanceof KeeperException.NoNodeException, true);
+    }
   }
 
   @Test
@@ -99,3 +111,4 @@ public class TestZookeeperLockManager {
     Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
   }
 }
+