You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2011/03/11 07:38:14 UTC
svn commit: r1080462 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
Driver.java exec/DDLTask.java lockmgr/HiveLockManager.java
lockmgr/HiveLockMode.java lockmgr/HiveLockObj.java
lockmgr/zookeeper/ZooKeeperHiveLockManager.java
Author: namit
Date: Fri Mar 11 06:38:13 2011
New Revision: 1080462
URL: http://svn.apache.org/viewvc?rev=1080462&view=rev
Log:
HIVE-2040 The retry logic in Hive's concurrency is not working
correctly (Yongqiang He via namit)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1080462&r1=1080461&r2=1080462&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Fri Mar 11 06:38:13 2011
@@ -657,7 +657,6 @@ public class Driver implements CommandPr
**/
public int acquireReadWriteLocks() {
try {
- int tryNum = 1;
int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
@@ -717,13 +716,31 @@ public class Driver implements CommandPr
}
ctx.setHiveLockMgr(hiveLockMgr);
- List<HiveLock> hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false, numRetries, sleepTime);
+ List<HiveLock> hiveLocks = null;
+
+ int tryNum = 1;
+ do {
+
+ ctx.getHiveLockMgr().prepareRetry();
+ hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false);
+
+ if (hiveLocks != null) {
+ break;
+ }
+ tryNum++;
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ } while (tryNum < numRetries);
+
if (hiveLocks == null) {
throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
} else {
ctx.setHiveLocks(hiveLocks);
}
+
return (0);
} catch (SemanticException e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1080462&r1=1080461&r2=1080462&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Fri Mar 11 06:38:13 2011
@@ -1927,7 +1927,7 @@ public class DDLTask extends Task<DDLWor
"EXPLICIT");
if (partSpec == null) {
- HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true, 0, 0);
+ HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true);
if (lck == null) {
return 1;
}
@@ -1938,7 +1938,7 @@ public class DDLTask extends Task<DDLWor
if (par == null) {
throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist");
}
- HiveLock lck = lockMgr.lock(new HiveLockObject(par, lockData), mode, true, 0, 0);
+ HiveLock lck = lockMgr.lock(new HiveLockObject(par, lockData), mode, true);
if (lck == null) {
return 1;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java?rev=1080462&r1=1080461&r2=1080462&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java Fri Mar 11 06:38:13 2011
@@ -28,17 +28,16 @@ public interface HiveLockManager {
* @param key object to be locked
* @param mode mode of the lock (SHARED/EXCLUSIVE)
* @param keepAlive if the lock needs to be persisted after the statement
- * @param sleepTime
- * @param numRetries
*/
public HiveLock lock(HiveLockObject key, HiveLockMode mode,
- boolean keepAlive, int numRetries, int sleepTime) throws LockException;
+ boolean keepAlive) throws LockException;
public List<HiveLock> lock(List<HiveLockObj> objs,
- boolean keepAlive, int numRetries, int sleepTime) throws LockException;
+ boolean keepAlive) throws LockException;
public void unlock(HiveLock hiveLock) throws LockException;
public void releaseLocks(List<HiveLock> hiveLocks);
public List<HiveLock> getLocks(boolean verifyTablePartitions, boolean fetchData) throws LockException;
public List<HiveLock> getLocks(HiveLockObject key, boolean verifyTablePartitions, boolean fetchData) throws LockException;
public void close() throws LockException;
+ public void prepareRetry() throws LockException;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java?rev=1080462&r1=1080461&r2=1080462&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java Fri Mar 11 06:38:13 2011
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.lockmgr;
-import org.apache.hadoop.hive.conf.HiveConf;
-
public enum HiveLockMode {
SHARED, EXCLUSIVE;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java?rev=1080462&r1=1080461&r2=1080462&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java Fri Mar 11 06:38:13 2011
@@ -18,9 +18,6 @@
package org.apache.hadoop.hive.ql.lockmgr;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-
public class HiveLockObj {
HiveLockObject obj;
HiveLockMode mode;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1080462&r1=1080461&r2=1080462&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Fri Mar 11 06:38:13 2011
@@ -27,21 +27,17 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
import java.util.ArrayList;
-import java.util.Set;
import java.util.Queue;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
import java.util.Comparator;
import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
-import org.apache.commons.lang.StringEscapeUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hive.ql.parse.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
@@ -100,7 +96,7 @@ public class ZooKeeperHiveLockManager im
parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
try {
- String par = zooKeeper.create("/" + parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zooKeeper.create("/" + parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
// ignore if the parent already exists
}
@@ -152,14 +148,12 @@ public class ZooKeeperHiveLockManager im
/**
* @param lockObjects List of objects and the modes of the locks requested
* @param keepAlive Whether the lock is to be persisted after the statement
- * @param numRetries number of retries when the lock can not be acquired
- * @param sleepTime sleep time between retries
*
* Acuire all the locks. Release all the locks and return null if any lock
* could not be acquired.
**/
public List<HiveLock> lock(List<HiveLockObj> lockObjects,
- boolean keepAlive, int numRetries, int sleepTime) throws LockException
+ boolean keepAlive) throws LockException
{
// Sort the objects first. You are guaranteed that if a partition is being locked,
// the table has already been locked
@@ -198,8 +192,7 @@ public class ZooKeeperHiveLockManager im
HiveLock lock = null;
try {
- lock = lock(lockObject.getObj(), lockObject.getMode(), false,
- numRetries, sleepTime, true);
+ lock = lock(lockObject.getObj(), lockObject.getMode(), false, true);
} catch (LockException e) {
console.printError("Error in acquireLocks: "+ e.getLocalizedMessage());
lock = null;
@@ -238,71 +231,45 @@ public class ZooKeeperHiveLockManager im
}
/**
- * @param key The object to be locked
- * @param mode The mode of the lock
- * @param keepAlive Whether the lock is to be persisted after the statement
- * @param numRetries number of retries when the lock can not be acquired
- * @param sleepTime sleep time between retries
- *
- * Acuire the lock. Return null if a conflicting lock is present.
+ * @param key
+ * The object to be locked
+ * @param mode
+ * The mode of the lock
+ * @param keepAlive
+ * Whether the lock is to be persisted after the statement Acuire the
+ * lock. Return null if a conflicting lock is present.
**/
public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode,
- boolean keepAlive, int numRetries, int sleepTime)
- throws LockException {
- return lock(key, mode, keepAlive, numRetries, sleepTime, false);
+ boolean keepAlive) throws LockException {
+ return lock(key, mode, keepAlive, false);
}
/**
- * @param name The name of the zookeeper child
- * @param data The data for the zookeeper child
- * @param mode The mode in which the child needs to be created
- * @param numRetries number of retries if the child cannot be created
- * @param sleepTime sleep time between retries
+ * @param name
+ * The name of the zookeeper child
+ * @param data
+ * The data for the zookeeper child
+ * @param mode
+ * The mode in which the child needs to be created
**/
- private String createChild(String name, byte[] data, CreateMode mode,
- int numRetries, int sleepTime) throws LockException {
+ private String createChild(String name, byte[] data, CreateMode mode) throws LockException {
String res = null;
- int tryNum = 0;
- while (true) {
- String msg = null;
- try {
- res = zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode);
- } catch (KeeperException e) {
- return null;
- // nothing to do if the node already exists
- } catch (Exception e) {
- msg = e.getLocalizedMessage();
- }
-
- if (res != null) {
- return res;
- }
-
- try {
- renewZookeeperInstance(sessionTimeout, quorumServers);
- } catch (Exception e) {
- console.printError("Lock for " + name
- + " cannot be acquired in " + mode);
- throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
- }
-
- if (tryNum == numRetries) {
- console.printError("Lock for " + name
- + " cannot be acquired in " + mode);
- throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
- }
-
- tryNum++;
-
- console.printInfo("Lock for " + name
- + " cannot be acquired in " + mode +
- ", will retry again later..., more info: " + msg);
+ String msg = null;
+ try {
+ res = zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode);
+ } catch (KeeperException e) {
+ return null;
+ // nothing to do if the node already exists
+ } catch (Exception e) {
+ msg = e.getLocalizedMessage();
+ }
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- }
+ if (res == null) {
+ console.printInfo("Lock for " + name + " cannot be acquired in " + mode
+ + ", will retry again later..., more info: " + msg);
}
+
+ return res;
}
private String getLockName(String parent, HiveLockMode mode) {
@@ -310,8 +277,7 @@ public class ZooKeeperHiveLockManager im
}
private ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode,
- boolean keepAlive, int numRetries, int sleepTime,
- boolean parentCreated)
+ boolean keepAlive, boolean parentCreated)
throws LockException {
String res;
@@ -331,12 +297,11 @@ public class ZooKeeperHiveLockManager im
// Create the parents first
for (String name : names) {
- res = createChild(name, new byte[0], CreateMode.PERSISTENT, numRetries, sleepTime);
+ res = createChild(name, new byte[0], CreateMode.PERSISTENT);
}
res = createChild(getLockName(lastName, mode), key.getData().toString().getBytes(),
- keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.EPHEMERAL_SEQUENTIAL,
- numRetries, sleepTime);
+ keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.EPHEMERAL_SEQUENTIAL);
int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
if (seqNo == -1) {
@@ -662,4 +627,13 @@ public class ZooKeeperHiveLockManager im
public void process(org.apache.zookeeper.WatchedEvent event) {
}
}
+
+ @Override
+ public void prepareRetry() throws LockException {
+ try {
+ renewZookeeperInstance(sessionTimeout, quorumServers);
+ } catch (Exception e) {
+ throw new LockException(e);
+ }
+ }
}