You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/12 07:08:02 UTC
svn commit: r1651036 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
itests/util/src/main/java/org/apache/hadoop/hive/ql/ ql/
ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/
ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/
Author: xuefu
Date: Mon Jan 12 06:08:02 2015
New Revision: 1651036
URL: http://svn.apache.org/r1651036
Log:
HIVE-9119: ZooKeeperHiveLockManager does not use zookeeper in the proper way (Na via Xuefu)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
hive/trunk/ql/pom.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jan 12 06:08:02 2015
@@ -1323,13 +1323,20 @@ public class HiveConf extends Configurat
"The port of ZooKeeper servers to talk to.\n" +
"If the list of Zookeeper servers specified in hive.zookeeper.quorum\n" +
"does not contain port numbers, this value is used."),
- HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000,
- "ZooKeeper client's session timeout. The client is disconnected, and as a result, all locks released, \n" +
+ HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", "600000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" +
"if a heartbeat is not sent in the timeout."),
HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace",
"The parent node under which all ZooKeeper nodes are created."),
HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false,
"Clean extra nodes at the end of the session."),
+ HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES("hive.zookeeper.connection.max.retries", 3,
+ "Max number of times to retry when connecting to the ZooKeeper server."),
+ HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME("hive.zookeeper.connection.basesleeptime", "1000ms",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "Initial amount of time (in milliseconds) to wait between retries\n" +
+ "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."),
// Transactions
HIVE_TXN_MANAGER("hive.txn.manager",
Modified: hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/trunk/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Mon Jan 12 06:08:02 2015
@@ -49,6 +49,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -124,7 +126,7 @@ public class QTestUtil {
private static MiniClusterType clusterType = MiniClusterType.none;
private ParseDriver pd;
protected Hive db;
- protected HiveConf conf;
+ static protected HiveConf conf;
private Driver drv;
private BaseSemanticAnalyzer sem;
protected final boolean overWrite;
@@ -1467,7 +1469,7 @@ public class QTestUtil {
zooKeeper.close();
}
- int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent arg0) {
@@ -1492,6 +1494,8 @@ public class QTestUtil {
}
public void tearDown() throws Exception {
+ CuratorFrameworkSingleton.closeAndReleaseInstance();
+
if (zooKeeperCluster != null) {
zooKeeperCluster.shutdown();
zooKeeperCluster = null;
Modified: hive/trunk/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/pom.xml?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/trunk/ql/pom.xml Mon Jan 12 06:08:02 2015
@@ -173,6 +173,23 @@
<version>${zookeeper.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>apache-curator</artifactId>
+ <version>${curator.version}</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>${groovy.version}</version>
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java?rev=1651036&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/CuratorFrameworkSingleton.java Mon Jan 12 06:08:02 2015
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
+
+public class CuratorFrameworkSingleton {
+ private static HiveConf conf = null;
+ private static CuratorFramework sharedClient = null;
+ static final Log LOG = LogFactory.getLog("CuratorFrameworkSingleton");
+ static {
+ // Add shutdown hook.
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ closeAndReleaseInstance();
+ }
+ });
+ }
+
+ public static synchronized CuratorFramework getInstance(HiveConf hiveConf) {
+ if (sharedClient == null) {
+ // Create a client instance
+ if (hiveConf == null) {
+ conf = new HiveConf();
+ } else {
+ conf = hiveConf;
+ }
+ int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int baseSleepTime = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+ int maxRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
+ String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
+
+ sharedClient = CuratorFrameworkFactory.builder().connectString(quorumServers)
+ .sessionTimeoutMs(sessionTimeout)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+ .build();
+ sharedClient.start();
+ }
+
+ return sharedClient;
+ }
+
+ public static synchronized void closeAndReleaseInstance() {
+ if (sharedClient != null) {
+ sharedClient.close();
+ sharedClient = null;
+ String shutdownMsg = "Closing ZooKeeper client.";
+ LOG.info(shutdownMsg);
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Mon Jan 12 06:08:02 2015
@@ -27,15 +27,10 @@ import org.apache.hadoop.hive.ql.lockmgr
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
-import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -47,14 +42,11 @@ public class ZooKeeperHiveLockManager im
public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager");
static final private LogHelper console = new LogHelper(LOG);
- private ZooKeeper zooKeeper;
+ private static CuratorFramework curatorFramework;
// All the locks are created under this parent
private String parent;
- private int sessionTimeout;
- private String quorumServers;
-
private long sleepTime;
private int numRetriesForLock;
private int numRetriesForUnLock;
@@ -80,8 +72,6 @@ public class ZooKeeperHiveLockManager im
public void setContext(HiveLockManagerCtx ctx) throws LockException {
this.ctx = ctx;
HiveConf conf = ctx.getConf();
- sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
- quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
sleepTime = conf.getTimeVar(
HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
@@ -89,20 +79,18 @@ public class ZooKeeperHiveLockManager im
numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
try {
- renewZookeeperInstance(sessionTimeout, quorumServers);
+ curatorFramework = CuratorFrameworkSingleton.getInstance(conf);
parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
-
- try {
- zooKeeper.create("/" + parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
+ try{
+ curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/" + parent, new byte[0]);
+ } catch (Exception e) {
// ignore if the parent already exists
- if (e.code() != KeeperException.Code.NODEEXISTS) {
+ if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) {
LOG.warn("Unexpected ZK exception when creating parent node /" + parent, e);
}
}
-
} catch (Exception e) {
- LOG.error("Failed to create ZooKeeper object: ", e);
+ LOG.error("Failed to create curatorFramework object: ", e);
throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
}
}
@@ -116,15 +104,6 @@ public class ZooKeeperHiveLockManager im
numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
}
- private void renewZookeeperInstance(int sessionTimeout, String quorumServers)
- throws InterruptedException, IOException {
- if (zooKeeper != null) {
- return;
- }
-
- zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new ZooKeeperHiveHelper.DummyWatcher());
- }
-
/**
* @param key object to be locked
* Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1,
@@ -266,8 +245,8 @@ public class ZooKeeperHiveLockManager im
* @throws InterruptedException
**/
private String createChild(String name, byte[] data, CreateMode mode)
- throws KeeperException, InterruptedException {
- return zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode);
+ throws Exception {
+ return curatorFramework.create().withMode(mode).forPath(name, data);
}
private String getLockName(String parent, HiveLockMode mode) {
@@ -347,7 +326,7 @@ public class ZooKeeperHiveLockManager im
private ZooKeeperHiveLock lockPrimitive(HiveLockObject key,
HiveLockMode mode, boolean keepAlive, boolean parentCreated,
Set<String> conflictingLocks)
- throws KeeperException, InterruptedException {
+ throws Exception {
String res;
// If the parents have already been created, create the last child only
@@ -369,8 +348,8 @@ public class ZooKeeperHiveLockManager im
for (String name : names) {
try {
res = createChild(name, new byte[0], CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- if (e.code() != KeeperException.Code.NODEEXISTS) {
+ } catch (Exception e) {
+ if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) {
//if the exception is not 'NODEEXISTS', re-throw it
throw e;
}
@@ -383,11 +362,11 @@ public class ZooKeeperHiveLockManager im
int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
if (seqNo == -1) {
- zooKeeper.delete(res, -1);
+ curatorFramework.delete().forPath(res);
return null;
}
- List<String> children = zooKeeper.getChildren(lastName, false);
+ List<String> children = curatorFramework.getChildren().forPath(lastName);
String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE);
String shLock = getLockName(lastName, HiveLockMode.SHARED);
@@ -407,12 +386,11 @@ public class ZooKeeperHiveLockManager im
if ((childSeq >= 0) && (childSeq < seqNo)) {
try {
- zooKeeper.delete(res, -1);
+ curatorFramework.delete().forPath(res);
} finally {
if (LOG.isDebugEnabled()) {
- Stat stat = new Stat();
try {
- String data = new String(zooKeeper.getData(child, false, stat));
+ String data = new String(curatorFramework.getData().forPath(child));
conflictingLocks.add(data);
} catch (Exception e) {
//ignored
@@ -428,11 +406,10 @@ public class ZooKeeperHiveLockManager im
/* Remove the lock specified */
public void unlock(HiveLock hiveLock) throws LockException {
- unlockWithRetry(ctx.getConf(), zooKeeper, hiveLock, parent);
+ unlockWithRetry(hiveLock, parent);
}
- private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient,
- HiveLock hiveLock, String parent) throws LockException {
+ private void unlockWithRetry(HiveLock hiveLock, String parent) throws LockException {
int tryNum = 0;
do {
@@ -440,14 +417,13 @@ public class ZooKeeperHiveLockManager im
tryNum++;
if (tryNum > 1) {
Thread.sleep(sleepTime);
- prepareRetry();
}
- unlockPrimitive(conf, zkpClient, hiveLock, parent);
+ unlockPrimitive(hiveLock, parent, curatorFramework);
break;
} catch (Exception e) {
if (tryNum >= numRetriesForUnLock) {
String name = ((ZooKeeperHiveLock)hiveLock).getPath();
- LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.");
+ LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.");
throw new LockException(e);
}
}
@@ -458,21 +434,20 @@ public class ZooKeeperHiveLockManager im
/* Remove the lock specified */
@VisibleForTesting
- static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient,
- HiveLock hiveLock, String parent) throws LockException {
+ static void unlockPrimitive(HiveLock hiveLock, String parent, CuratorFramework curatorFramework) throws LockException {
ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock;
HiveLockObject obj = zLock.getHiveLockObject();
String name = getLastObjectName(parent, obj);
try {
- zkpClient.delete(zLock.getPath(), -1);
+ curatorFramework.delete().forPath(zLock.getPath());
// Delete the parent node if all the children have been deleted
- List<String> children = zkpClient.getChildren(name, false);
+ List<String> children = curatorFramework.getChildren().forPath(name);
if (children == null || children.isEmpty()) {
- zkpClient.delete(name, -1);
+ curatorFramework.delete().forPath(name);
}
} catch (KeeperException.NoNodeException nne) {
- //can happen in retrying deleting the zLock after exceptions like InterruptedException
+ //can happen in retrying deleting the zLock after exceptions like InterruptedException
//or in a race condition where parent has already been deleted by other process when it
//is to be deleted. Both cases should not raise error
LOG.debug("Node " + zLock.getPath() + " or its parent has already been deleted.");
@@ -480,7 +455,7 @@ public class ZooKeeperHiveLockManager im
//can happen in a race condition where another process adds a zLock under this parent
//just before it is about to be deleted. It should not be a problem since this parent
//can eventually be deleted by the process which hold its last child zLock
- LOG.debug("Node " + name + " to be deleted is not empty.");
+ LOG.debug("Node " + name + " to be deleted is not empty.");
} catch (Exception e) {
//exceptions including InterruptException and other KeeperException
LOG.error("Failed to release ZooKeeper lock: ", e);
@@ -490,19 +465,14 @@ public class ZooKeeperHiveLockManager im
/* Release all locks - including PERSISTENT locks */
public static void releaseAllLocks(HiveConf conf) throws Exception {
- ZooKeeper zkpClient = null;
try {
- int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
- String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
- Watcher dummyWatcher = new ZooKeeperHiveHelper.DummyWatcher();
- zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher);
String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
- List<HiveLock> locks = getLocks(conf, zkpClient, null, parent, false, false);
+ List<HiveLock> locks = getLocks(conf, null, parent, false, false);
Exception lastExceptionGot = null;
if (locks != null) {
for (HiveLock lock : locks) {
try {
- unlockPrimitive(conf, zkpClient, lock, parent);
+ unlockPrimitive(lock, parent, curatorFramework);
} catch (Exception e) {
lastExceptionGot = e;
}
@@ -516,24 +486,19 @@ public class ZooKeeperHiveLockManager im
} catch (Exception e) {
LOG.error("Failed to release all locks: ", e);
throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
- } finally {
- if (zkpClient != null) {
- zkpClient.close();
- zkpClient = null;
- }
}
}
/* Get all locks */
public List<HiveLock> getLocks(boolean verifyTablePartition, boolean fetchData)
throws LockException {
- return getLocks(ctx.getConf(), zooKeeper, null, parent, verifyTablePartition, fetchData);
+ return getLocks(ctx.getConf(), null, parent, verifyTablePartition, fetchData);
}
/* Get all locks for a particular object */
public List<HiveLock> getLocks(HiveLockObject key, boolean verifyTablePartitions,
boolean fetchData) throws LockException {
- return getLocks(ctx.getConf(), zooKeeper, key, parent, verifyTablePartitions, fetchData);
+ return getLocks(ctx.getConf(), key, parent, verifyTablePartitions, fetchData);
}
/**
@@ -541,7 +506,7 @@ public class ZooKeeperHiveLockManager im
* @param zkpClient The ZooKeeper client
* @param key The object to be compared against - if key is null, then get all locks
**/
- private static List<HiveLock> getLocks(HiveConf conf, ZooKeeper zkpClient,
+ private static List<HiveLock> getLocks(HiveConf conf,
HiveLockObject key, String parent, boolean verifyTablePartition, boolean fetchData)
throws LockException {
List<HiveLock> locks = new ArrayList<HiveLock>();
@@ -552,12 +517,12 @@ public class ZooKeeperHiveLockManager im
try {
if (key != null) {
commonParent = "/" + parent + "/" + key.getName();
- children = zkpClient.getChildren(commonParent, false);
+ children = curatorFramework.getChildren().forPath(commonParent);
recurse = false;
}
else {
commonParent = "/" + parent;
- children = zkpClient.getChildren(commonParent, false);
+ children = curatorFramework.getChildren().forPath(commonParent);
}
} catch (Exception e) {
// no locks present
@@ -579,7 +544,7 @@ public class ZooKeeperHiveLockManager im
if (recurse) {
try {
- children = zkpClient.getChildren(curChild, false);
+ children = curatorFramework.getChildren().forPath(curChild);
for (String child : children) {
childn.add(curChild + "/" + child);
}
@@ -588,7 +553,7 @@ public class ZooKeeperHiveLockManager im
}
}
- HiveLockMode mode = getLockMode(conf, curChild);
+ HiveLockMode mode = getLockMode(curChild);
if (mode == null) {
continue;
}
@@ -605,8 +570,7 @@ public class ZooKeeperHiveLockManager im
if (fetchData) {
try {
- data = new HiveLockObjectData(new String(zkpClient.getData(curChild,
- new ZooKeeperHiveHelper.DummyWatcher(), null)));
+ data = new HiveLockObjectData(new String(curatorFramework.getData().watched().forPath(curChild)));
data.setClientIp(clientIp);
} catch (Exception e) {
LOG.error("Error in getting data for " + curChild, e);
@@ -623,12 +587,7 @@ public class ZooKeeperHiveLockManager im
/** Remove all redundant nodes **/
private void removeAllRedundantNodes() {
try {
- renewZookeeperInstance(sessionTimeout, quorumServers);
checkRedundantNode("/" + parent);
- if (zooKeeper != null) {
- zooKeeper.close();
- zooKeeper = null;
- }
} catch (Exception e) {
LOG.warn("Exception while removing all redundant nodes", e);
}
@@ -637,19 +596,19 @@ public class ZooKeeperHiveLockManager im
private void checkRedundantNode(String node) {
try {
// Nothing to do if it is a lock mode
- if (getLockMode(ctx.getConf(), node) != null) {
+ if (getLockMode(node) != null) {
return;
}
- List<String> children = zooKeeper.getChildren(node, false);
+ List<String> children = curatorFramework.getChildren().forPath(node);
for (String child : children) {
checkRedundantNode(node + "/" + child);
}
- children = zooKeeper.getChildren(node, false);
+ children = curatorFramework.getChildren().forPath(node);
if ((children == null) || (children.isEmpty()))
{
- zooKeeper.delete(node, -1);
+ curatorFramework.delete().forPath(node);
}
} catch (Exception e) {
LOG.warn("Error in checkRedundantNode for node " + node, e);
@@ -658,12 +617,7 @@ public class ZooKeeperHiveLockManager im
/* Release all transient locks, by simply closing the client */
public void close() throws LockException {
- try {
-
- if (zooKeeper != null) {
- zooKeeper.close();
- zooKeeper = null;
- }
+ try {
if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) {
removeAllRedundantNodes();
@@ -750,7 +704,7 @@ public class ZooKeeperHiveLockManager im
private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$");
/* Get the mode of the lock encoded in the path */
- private static HiveLockMode getLockMode(HiveConf conf, String path) {
+ private static HiveLockMode getLockMode(String path) {
Matcher shMatcher = shMode.matcher(path);
Matcher exMatcher = exMode.matcher(path);
@@ -768,15 +722,6 @@ public class ZooKeeperHiveLockManager im
@Override
public void prepareRetry() throws LockException {
- try {
- if (zooKeeper != null && zooKeeper.getState() == ZooKeeper.States.CLOSED) {
- // Reconnect if the connection is closed.
- zooKeeper = null;
- }
- renewZookeeperInstance(sessionTimeout, quorumServers);
- } catch (Exception e) {
- throw new LockException(e);
- }
}
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java?rev=1651036&r1=1651035&r2=1651036&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java Mon Jan 12 06:08:02 2015
@@ -18,70 +18,82 @@
package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
-import static org.mockito.Mockito.*;
-
-import java.util.Collections;
-
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.After;
import org.junit.Test;
-import com.google.common.base.Joiner;
-
public class TestZookeeperLockManager {
- private static final Joiner SLASH = Joiner.on("/");
- private static final String PARENT = "hive";
- private static final String TABLE = "t1";
- private static final String PARENT_LOCK_PATH = SLASH.join("", PARENT, TABLE);
- private static final String TABLE_LOCK_PATH = SLASH.join("", PARENT, TABLE, "00001");
private HiveConf conf;
- private ZooKeeper zooKeeper;
+ private TestingServer server;
+ private CuratorFramework client;
private HiveLockObject hiveLock;
private ZooKeeperHiveLock zLock;
+ private HiveLockObjectData lockObjData;
+ private static final String PARENT = "hive";
+ private static final String TABLE = "t1";
+ private static final String PARENT_LOCK_PATH = "/hive/t1";
+ private static final String TABLE_LOCK_PATH = "/hive/t1/00001";
@Before
public void setup() {
conf = new HiveConf();
- zooKeeper = mock(ZooKeeper.class);
- hiveLock = mock(HiveLockObject.class);
- when(hiveLock.getName()).thenReturn(TABLE);
+ lockObjData = new HiveLockObjectData("1", "10", "SHARED", "show tables");
+ hiveLock = new HiveLockObject(TABLE, lockObjData);
zLock = new ZooKeeperHiveLock(TABLE_LOCK_PATH, hiveLock, HiveLockMode.SHARED);
- }
- @Test
- public void testDeleteNoChildren() throws Exception {
- ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
- verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
- verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
- verify(zooKeeper).delete(PARENT_LOCK_PATH, -1);
- verifyNoMoreInteractions(zooKeeper);
+ while (server == null)
+ {
+ try {
+ server = new TestingServer();
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build();
+ client.start();
+ } catch (Exception e) {
+ System.err.println("Getting bind exception - retrying to allocate server");
+ server = null;
+ }
+ }
}
- /**
- * Tests two threads racing to delete PARENT_LOCK_PATH
- */
- @Test
- public void testDeleteNoChildrenNodeDoesNotExist() throws Exception {
- doThrow(new KeeperException.NoNodeException()).when(zooKeeper).delete(PARENT_LOCK_PATH, -1);
- ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
- verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
- verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
- verify(zooKeeper).delete(PARENT_LOCK_PATH, -1);
- verifyNoMoreInteractions(zooKeeper);
+
+ @After
+ public void teardown() throws Exception
+ {
+ client.close();
+ server.close();
+ server = null;
}
+
@Test
- public void testDeleteWithChildren() throws Exception {
- when(zooKeeper.getChildren(PARENT_LOCK_PATH, false)).thenReturn(Collections.singletonList("somechild"));
- ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
- verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
- verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
- verifyNoMoreInteractions(zooKeeper);
+ public void testDeleteNoChildren() throws Exception
+ {
+ client.create().creatingParentsIfNeeded().forPath(TABLE_LOCK_PATH, lockObjData.toString().getBytes());
+ byte[] data = client.getData().forPath(TABLE_LOCK_PATH);
+ Assert.assertArrayEquals(lockObjData.toString().getBytes(), data);
+ ZooKeeperHiveLockManager.unlockPrimitive(zLock, PARENT, client);
+ try {
+ data = client.getData().forPath(TABLE_LOCK_PATH);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals( e instanceof KeeperException.NoNodeException, true);
+ }
+ try {
+ data = client.getData().forPath(PARENT_LOCK_PATH);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals( e instanceof KeeperException.NoNodeException, true);
+ }
}
@Test
@@ -99,3 +111,4 @@ public class TestZookeeperLockManager {
Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
}
}
+