You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jv...@apache.org on 2010/08/24 17:47:26 UTC
svn commit: r988603 [1/2] - in /hadoop/hive/trunk: ./
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/conf/ conf/ data/conf/
hbase-handler/lib/ hbase-handler/src/test/templates/
hwi/src/test/org/apache/hadoop/hive/hwi/ j...
Author: jvs
Date: Tue Aug 24 15:47:24 2010
New Revision: 988603
URL: http://svn.apache.org/viewvc?rev=988603&view=rev
Log:
HIVE-1293. Concurrency Model for Hive
(Namit Jain via jvs)
Added:
hadoop/hive/trunk/lib/hbase-0.20.3-test.jar (with props)
hadoop/hive/trunk/lib/hbase-0.20.3.jar (with props)
hadoop/hive/trunk/lib/zookeeper-3.2.2.jar (with props)
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java
hadoop/hive/trunk/ql/src/test/queries/clientnegative/lockneg1.q
hadoop/hive/trunk/ql/src/test/queries/clientnegative/lockneg2.q
hadoop/hive/trunk/ql/src/test/queries/clientnegative/lockneg3.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/lock1.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/lock2.q
hadoop/hive/trunk/ql/src/test/results/clientnegative/lockneg1.q.out
hadoop/hive/trunk/ql/src/test/results/clientnegative/lockneg2.q.out
hadoop/hive/trunk/ql/src/test/results/clientnegative/lockneg3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/lock1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/lock2.q.out
Removed:
hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar
hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar
hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/data/conf/hive-site.xml
hadoop/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
hadoop/hive/trunk/hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java
hadoop/hive/trunk/jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm
hadoop/hive/trunk/ql/src/test/templates/TestNegativeCliDriver.vm
hadoop/hive/trunk/ql/src/test/templates/TestParse.vm
hadoop/hive/trunk/ql/src/test/templates/TestParseNegative.vm
hadoop/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java
hadoop/hive/trunk/service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Aug 24 15:47:24 2010
@@ -59,6 +59,9 @@ Trunk - Unreleased
HIVE-1578. Add conf. variable for displaying link to task
with most failures (Paul Yang via namit)
+ HIVE-1293. Concurrency Model for Hive
+ (Namit Jain via jvs)
+
IMPROVEMENTS
HIVE-1394. Do not update transient_lastDdlTime if the partition is modified by a housekeeping
@@ -191,7 +194,7 @@ Trunk - Unreleased
HIVE-1510. HiveCombineInputFormat should not use prefix matching to find
the partitionDesc for a given path
(He Yongqiang via Ning Zhang)
-
+
HIVE-1584. wrong log files in contrib client positive
(namit via He Yongqiang)
Modified: hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (original)
+++ hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java Tue Aug 24 15:47:24 2010
@@ -59,9 +59,9 @@ public class CliDriver {
public static final String prompt = "hive";
public static final String prompt2 = " "; // when ';' is not yet seen
-
+
public static final String HIVERCFILE = ".hiverc";
-
+
private final LogHelper console;
private final Configuration conf;
@@ -130,7 +130,7 @@ public class CliDriver {
}
} else {
- CommandProcessor proc = CommandProcessorFactory.get(tokens[0]);
+ CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf);
if (proc != null) {
if (proc instanceof Driver) {
Driver qp = (Driver) proc;
@@ -201,9 +201,11 @@ public class CliDriver {
lastRet = ret;
boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
+ CommandProcessorFactory.clean((HiveConf)conf);
return ret;
}
}
+ CommandProcessorFactory.clean((HiveConf)conf);
return lastRet;
}
@@ -261,7 +263,7 @@ public class CliDriver {
}
ss.setIsSilent(saveSilent);
}
-
+
public static void main(String[] args) throws Exception {
OptionsProcessor oproc = new OptionsProcessor();
@@ -311,7 +313,7 @@ public class CliDriver {
// Execute -i init files (always in silent mode)
cli.processInitFiles(ss);
-
+
if (ss.execString != null) {
System.exit(cli.processLine(ss.execString));
}
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Aug 24 15:47:24 2010
@@ -93,6 +93,7 @@ public class HiveConf extends Configurat
DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100),
MAXCREATEDFILES("hive.exec.max.created.files", 100000L),
DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"),
+ DEFAULT_ZOOKEEPER_PARTITION_NAME("hive.lockmgr.zookeeper.default.partition.name", "__HIVE_DEFAULT_ZOOKEEPER_PARTITION__"),
// Whether to show a link to the most failed task + debugging tips
SHOW_JOB_FAIL_DEBUG_INFO("hive.exec.show.job.failure.debug.info", true),
@@ -265,14 +266,25 @@ public class HiveConf extends Configurat
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+ HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false),
+ HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"),
+ HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100),
+ HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60),
+
+ HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", ""),
+ HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", ""),
+ HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000),
+
// For HBase storage handler
HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true),
// For har files
HIVEARCHIVEENABLED("hive.archive.enabled", false),
HIVEHARPARENTDIRSETTABLE("hive.archive.har.parentdir.settable", false),
+
;
+
public final String varname;
public final String defaultVal;
public final int defaultIntVal;
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Tue Aug 24 15:47:24 2010
@@ -607,6 +607,48 @@
</property>
<property>
+ <name>hive.support.concurrency</name>
+ <value>false</value>
+ <description>Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.</description>
+</property>
+
+<property>
+ <name>hive.concurrency.manager</name>
+ <value>org.apache.hadoop.hive.ql.lockmgr.ZooKeeperLockMgr</value>
+ <description>The concurrency manager for hive.</description>
+</property>
+
+<property>
+ <name>hive.lock.numretries</name>
+ <value>100</value>
+ <description>The number of times you want to try to get all the locks</description>
+</property>
+
+<property>
+ <name>hive.lock.sleep.between.retries</name>
+ <value>60</value>
+ <description>The sleep time (in seconds) between various retries</description>
+</property>
+
+<property>
+ <name>hive.zookeeper.quorum</name>
+ <value></value>
+ <description>The list of zookeeper servers to talk to. This is only needed for read/write locks.</description>
+</property>
+
+<property>
+ <name>hive.zookeeper.client.port</name>
+ <value>2181</value>
+ <description>The port of zookeeper servers to talk to. This is only needed for read/write locks.</description>
+</property>
+
+<property>
+ <name>hive.zookeeper.session.timeout</name>
+ <value></value>
+ <description>Zookeeper client's session timeout. The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout.</description>
+</property>
+
+<property>
<name>fs.har.impl</name>
<value>org.apache.hadoop.hive.shims.HiveHarFileSystem</value>
<description>The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop vers less than 0.20</description>
Modified: hadoop/hive/trunk/data/conf/hive-site.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/conf/hive-site.xml?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/data/conf/hive-site.xml (original)
+++ hadoop/hive/trunk/data/conf/hive-site.xml Tue Aug 24 15:47:24 2010
@@ -146,6 +146,12 @@
</property>
<property>
+ <name>hive.support.concurrency</name>
+ <value>true</value>
+ <description>Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.</description>
+</property>
+
+<property>
<name>fs.pfile.impl</name>
<value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
<description>A proxy for local file system used for cross file system testing</description>
Modified: hadoop/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm Tue Aug 24 15:47:24 2010
@@ -88,6 +88,7 @@ public class $className extends TestCase
try {
System.out.println("Begin query: " + "$fname");
qt.cliInit("$fname");
+ qt.clearTestSideEffects();
int ecode = qt.executeClient("$fname");
if (ecode != 0) {
fail("Client Execution failed with error code = " + ecode);
@@ -112,6 +113,8 @@ public class $className extends TestCase
if (ecode != 0) {
fail("Client execution results failed with error code = " + ecode);
}
+ qt.clearPostTestEffects();
+
} catch (Throwable e) {
System.out.println("Exception: " + e.getMessage());
e.printStackTrace();
Modified: hadoop/hive/trunk/hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java (original)
+++ hadoop/hive/trunk/hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java Tue Aug 24 15:47:24 2010
@@ -86,6 +86,10 @@ public class TestHWISessionManager exten
assertEquals(hsm.findAllSessionsForUser(user2).size(), 1);
assertEquals(hsm.findAllSessionItems().size(), 3);
+ user1_item1.addQuery("set hive.support.concurrency = false");
+ user1_item2.addQuery("set hive.support.concurrency = false");
+ user2_item1.addQuery("set hive.support.concurrency = false");
+
HWISessionItem searchItem = hsm.findSessionItemByName(user1, "session1");
assertEquals(searchItem, user1_item1);
@@ -105,10 +109,12 @@ public class TestHWISessionManager exten
zero.add(0);
zero.add(0);
zero.add(0);
+ zero.add(0);
ArrayList<Integer> zero3 = new ArrayList<Integer>();
zero3.add(0);
zero3.add(0);
zero3.add(0);
+ zero3.add(0);
ArrayList<Integer> zero1 = new ArrayList<Integer>();
zero1.add(0);
assertEquals(zero, searchItem.getQueryRet());
@@ -194,6 +200,7 @@ public class TestHWISessionManager exten
// cleanup
HWISessionItem cleanup = hsm.createSession(user1, "cleanup");
+ cleanup.addQuery("set hive.support.concurrency = false");
cleanup.addQuery("drop table " + tableName);
cleanup.clientStart();
Modified: hadoop/hive/trunk/jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java (original)
+++ hadoop/hive/trunk/jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java Tue Aug 24 15:47:24 2010
@@ -79,6 +79,8 @@ public class TestJdbcDriver extends Test
Statement stmt = con.createStatement();
assertNotNull("Statement is null", stmt);
+ stmt.executeQuery("set hive.support.concurrency = false");
+
// drop table. ignore error.
try {
stmt.executeQuery("drop table " + tableName);
Added: hadoop/hive/trunk/lib/hbase-0.20.3-test.jar
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/lib/hbase-0.20.3-test.jar?rev=988603&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/hive/trunk/lib/hbase-0.20.3-test.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/hive/trunk/lib/hbase-0.20.3.jar
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/lib/hbase-0.20.3.jar?rev=988603&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/hive/trunk/lib/hbase-0.20.3.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/hive/trunk/lib/zookeeper-3.2.2.jar
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/lib/zookeeper-3.2.2.jar?rev=988603&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/hive/trunk/lib/zookeeper-3.2.2.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Tue Aug 24 15:47:24 2010
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.List;
import org.antlr.runtime.TokenRewriteStream;
import org.apache.commons.logging.Log;
@@ -41,6 +42,8 @@ import org.apache.hadoop.fs.ContentSumma
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.conf.Configuration;
/**
@@ -67,7 +70,6 @@ public class Context {
// Keeps track of scratch directories created for different scheme/authority
private final Map<String, String> fsScratchDirs = new HashMap<String, String>();
-
private Configuration conf;
protected int pathid = 10000;
protected boolean explain = false;
@@ -75,6 +77,12 @@ public class Context {
String executionId;
+ // List of Locks for this query
+ protected List<HiveLock> hiveLocks;
+ protected HiveLockManager hiveLockMgr;
+
+ private boolean needLockMgr;
+
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
@@ -86,7 +94,7 @@ public class Context {
public Context(Configuration conf, String executionId) {
this.conf = conf;
this.executionId = executionId;
-
+
// non-local tmp location is configurable. however it is the same across
// all external file systems
nonLocalScratchPath =
@@ -106,7 +114,7 @@ public class Context {
public void setExplain(boolean value) {
explain = value;
}
-
+
/**
* Find whether the current query is an explain query
* @return true if the query is an explain query, false if not
@@ -119,7 +127,7 @@ public class Context {
/**
* Get a tmp directory on specified URI
*
- * @param scheme Scheme of the target FS
+ * @param scheme Scheme of the target FS
* @param authority Authority of the target FS
* @param mkdir create the directory if true
* @param scratchdir path of tmp directory
@@ -166,7 +174,7 @@ public class Context {
/**
* Create a map-reduce scratch directory on demand and return it.
- *
+ *
*/
public String getMRScratchDir() {
@@ -231,7 +239,7 @@ public class Context {
/**
* Get a path to store map-reduce intermediate data in.
- *
+ *
* @return next available path for map-red intermediate data
*/
public String getMRTmpFileURI() {
@@ -241,8 +249,8 @@ public class Context {
/**
- * Given a URI for mapreduce intermediate output, swizzle the
- * it to point to the local file system. This can be called in
+ * Given a URI for mapreduce intermediate output, swizzle the
+ * it to point to the local file system. This can be called in
* case the caller decides to run in local mode (in which case
* all intermediate data can be stored locally)
*
@@ -259,7 +267,7 @@ public class Context {
("Invalid URI: " + originalURI + ", cannot relativize against" +
mrbase.toString());
- return getLocalScratchDir(!explain) + Path.SEPARATOR +
+ return getLocalScratchDir(!explain) + Path.SEPARATOR +
relURI.getPath();
}
@@ -343,6 +351,7 @@ public class Context {
}
removeScratchDir();
originalTracker = null;
+ setNeedLockMgr(false);
}
public DataInput getStream() {
@@ -458,6 +467,22 @@ public class Context {
return HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local");
}
+ public List<HiveLock> getHiveLocks() {
+ return hiveLocks;
+ }
+
+ public void setHiveLocks(List<HiveLock> hiveLocks) {
+ this.hiveLocks = hiveLocks;
+ }
+
+ public HiveLockManager getHiveLockMgr() {
+ return hiveLockMgr;
+ }
+
+ public void setHiveLockMgr(HiveLockManager hiveLockMgr) {
+ this.hiveLockMgr = hiveLockMgr;
+ }
+
public void setOriginalTracker(String originalTracker) {
this.originalTracker = originalTracker;
}
@@ -474,7 +499,7 @@ public class Context {
pathToCS = new HashMap<String, ContentSummary> ();
pathToCS.put(path, cs);
}
-
+
public ContentSummary getCS(String path) {
if(pathToCS == null)
pathToCS = new HashMap<String, ContentSummary> ();
@@ -517,4 +542,12 @@ public class Context {
}
paths.addAll(toAdd);
}
+
+ public boolean isNeedLockMgr() {
+ return needLockMgr;
+ }
+
+ public void setNeedLockMgr(boolean needLockMgr) {
+ this.needLockMgr = needLockMgr;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Aug 24 15:47:24 2010
@@ -24,6 +24,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -53,6 +55,14 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.hooks.PostExecute;
import org.apache.hadoop.hive.ql.hooks.PreExecute;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
@@ -72,6 +82,10 @@ import org.apache.hadoop.mapred.ClusterS
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
+import org.apache.hadoop.hive.ql.metadata.Table;
public class Driver implements CommandProcessor {
@@ -86,6 +100,7 @@ public class Driver implements CommandPr
private Context ctx;
private QueryPlan plan;
private Schema schema;
+ private HiveLockManager hiveLockMgr;
private String errorMessage;
private String SQLState;
@@ -94,6 +109,40 @@ public class Driver implements CommandPr
private int maxthreads;
private final int sleeptime = 2000;
+ private int checkLockManager() {
+ boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ if (supportConcurrency && (hiveLockMgr == null)) {
+ try {
+ setLockManager();
+ } catch (SemanticException e) {
+ errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
+ SQLState = ErrorMsg.findSQLState(e.getMessage());
+ console.printError(errorMessage, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return (12);
+ }
+ }
+ return (0);
+ }
+
+ private void setLockManager() throws SemanticException {
+ boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ if (supportConcurrency) {
+ String lockMgr = conf.getVar(HiveConf.ConfVars.HIVE_LOCK_MANAGER);
+ if ((lockMgr == null) || (lockMgr.isEmpty())) {
+ throw new SemanticException(ErrorMsg.LOCKMGR_NOT_SPECIFIED.getMsg());
+ }
+
+ try {
+ hiveLockMgr = (HiveLockManager)
+ ReflectionUtils.newInstance(conf.getClassByName(lockMgr), conf);
+ hiveLockMgr.setContext(new HiveLockManagerCtx(conf));
+ } catch (Exception e) {
+ throw new SemanticException(ErrorMsg.LOCKMGR_NOT_INITIALIZED.getMsg() + e.getMessage());
+ }
+ }
+ }
+
public void init() {
Operator.resetId();
}
@@ -345,20 +394,256 @@ public class Driver implements CommandPr
return plan;
}
+ public static class LockObject {
+ HiveLockObject obj;
+ HiveLockMode mode;
+
+ public LockObject(HiveLockObject obj, HiveLockMode mode) {
+ this.obj = obj;
+ this.mode = mode;
+ }
+
+ public HiveLockObject getObj() {
+ return obj;
+ }
+
+ public HiveLockMode getMode() {
+ return mode;
+ }
+
+ public String getName() {
+ return obj.getName();
+ }
+ }
+
+ /**
+ * @param t The table to be locked
+ * @param p The partition to be locked
+ * @param mode The mode of the lock (SHARED/EXCLUSIVE)
+ * Get the list of objects to be locked. If a partition needs to be locked (in any mode), all its parents
+ * should also be locked in SHARED mode.
+ **/
+ private List<LockObject> getLockObjects(Table t, Partition p, HiveLockMode mode) {
+ List<LockObject> locks = new LinkedList<LockObject>();
+
+ if (t != null) {
+ locks.add(new LockObject(new HiveLockObject(t), mode));
+ return locks;
+ }
+
+ if (p != null) {
+ locks.add(new LockObject(new HiveLockObject(p), mode));
+
+ // All the parents are locked in shared mode
+ mode = HiveLockMode.SHARED;
+
+ String partName = p.getName();
+ String partialName = "";
+ String[] partns = p.getName().split("/");
+ for (int idx = 0; idx < partns.length -1; idx++) {
+ String partn = partns[idx];
+ partialName += partialName + partn;
+ locks.add(new LockObject(new HiveLockObject(new DummyPartition(p.getTable().getDbName() + "@" + p.getTable().getTableName() + "@" + partialName)), mode));
+ partialName += "/";
+ }
+
+ locks.add(new LockObject(new HiveLockObject(p.getTable()), mode));
+ }
+ return locks;
+ }
+
+ /**
+ * Acquire read and write locks needed by the statement. The list of objects to be locked are obtained
+ * from he inputs and outputs populated by the compiler. The lock acuisition scheme is pretty simple.
+ * If all the locks cannot be obtained, error out. Deadlock is avoided by making sure that the locks
+ * are lexicographically sorted.
+ **/
+ public int acquireReadWriteLocks() {
+ try {
+ int tryNum = 1;
+ int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000;
+ int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES);
+
+ boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ if (!supportConcurrency) {
+ return 0;
+ }
+
+ List<LockObject> lockObjects = new ArrayList<LockObject>();
+
+ // Sort all the inputs, outputs.
+ // If a lock needs to be acquired on any partition, a read lock needs to be acquired on all its parents also
+ for (ReadEntity input : plan.getInputs()) {
+ if (input.getType() == ReadEntity.Type.TABLE) {
+ lockObjects.addAll(getLockObjects(input.getTable(), null, HiveLockMode.SHARED));
+ }
+ else {
+ lockObjects.addAll(getLockObjects(null, input.getPartition(), HiveLockMode.SHARED));
+ }
+ }
+
+ for (WriteEntity output : plan.getOutputs()) {
+ if (output.getTyp() == WriteEntity.Type.TABLE) {
+ lockObjects.addAll(getLockObjects(output.getTable(), null, HiveLockMode.EXCLUSIVE));
+ }
+ else if (output.getTyp() == WriteEntity.Type.PARTITION) {
+ lockObjects.addAll(getLockObjects(null, output.getPartition(), HiveLockMode.EXCLUSIVE));
+ }
+ }
+
+ if (lockObjects.isEmpty() && !ctx.isNeedLockMgr()) {
+ return 0;
+ }
+
+ int ret = checkLockManager();
+ if (ret != 0) {
+ return ret;
+ }
+
+
+ ctx.setHiveLockMgr(hiveLockMgr);
+
+ Collections.sort(lockObjects, new Comparator<LockObject>() {
+
+ @Override
+ public int compare(LockObject o1, LockObject o2) {
+ int cmp = o1.getName().compareTo(o2.getName());
+ if (cmp == 0) {
+ if (o1.getMode() == o2.getMode()) {
+ return cmp;
+ }
+ // EXCLUSIVE locks occur before SHARED locks
+ if (o1.getMode() == HiveLockMode.EXCLUSIVE) {
+ return -1;
+ }
+ return +1;
+ }
+ return cmp;
+ }
+
+ });
+
+ // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep and retry
+ while (true) {
+ List<HiveLock> hiveLocks = acquireLocks(lockObjects);
+
+ if (hiveLocks == null) {
+ if (tryNum == numRetries) {
+ throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
+ }
+ tryNum++;
+
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ else {
+ ctx.setHiveLocks(hiveLocks);
+ break;
+ }
+ }
+ return (0);
+ } catch (SemanticException e) {
+ errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
+ SQLState = ErrorMsg.findSQLState(e.getMessage());
+ console.printError(errorMessage, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return (10);
+ }
+ }
+
+ /**
+ * @param lockObjects The list of objects to be locked
+ * Lock the objects specified in the list. The same object is not locked twice, and the list passed is sorted
+ * such that EXCLUSIVE locks occur before SHARED locks.
+ **/
+ private List<HiveLock> acquireLocks(List<LockObject> lockObjects) throws SemanticException {
+ // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep and retry
+ LockObject prevLockObj = null;
+ List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
+
+ for (LockObject lockObject: lockObjects) {
+ // No need to acquire a lock twice on the same object
+ // It is ensured that EXCLUSIVE locks occur before SHARED locks on the same object
+ if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) {
+ prevLockObj = lockObject;
+ continue;
+ }
+
+ HiveLock lock = null;
+ try {
+ lock = ctx.getHiveLockMgr().lock(lockObject.getObj(), lockObject.getMode(), false);
+ } catch (LockException e) {
+ lock = null;
+ }
+
+ if (lock == null) {
+ releaseLocks(hiveLocks);
+ return null;
+ }
+
+ hiveLocks.add(lock);
+ prevLockObj = lockObject;
+ }
+
+ return hiveLocks;
+ }
+
+ /**
+ * Release all the locks acquired implicitly by the statement. Note that the locks acquired
+ * with 'keepAlive' set to True are not released.
+ **/
+ private void releaseLocks() {
+ if (ctx != null && ctx.getHiveLockMgr() != null) {
+ try {
+ ctx.getHiveLockMgr().close();
+ ctx.setHiveLocks(null);
+ } catch (LockException e) {
+ }
+ }
+ }
+
+ /**
+ * @param hiveLocks list of hive locks to be released
+ * Release all the locks specified. If some of the locks have already been released, ignore them
+ **/
+ private void releaseLocks(List<HiveLock> hiveLocks) {
+ if (hiveLocks != null) {
+ for (HiveLock hiveLock: hiveLocks) {
+ try {
+ ctx.getHiveLockMgr().unlock(hiveLock);
+ } catch (LockException e) {
+ // The lock may have been released. Ignore and continue
+ }
+ }
+ ctx.setHiveLocks(null);
+ }
+ }
+
public CommandProcessorResponse run(String command) {
errorMessage = null;
SQLState = null;
int ret = compile(command);
if (ret != 0) {
+ releaseLocks(ctx.getHiveLocks());
+ return new CommandProcessorResponse(ret, errorMessage, SQLState);
+ }
+
+ ret = acquireReadWriteLocks();
+ if (ret != 0) {
+ releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret, errorMessage, SQLState);
}
ret = execute();
if (ret != 0) {
+ releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret, errorMessage, SQLState);
}
+ releaseLocks(ctx.getHiveLocks());
return new CommandProcessorResponse(ret);
}
@@ -722,6 +1007,10 @@ public class Driver implements CommandPr
return 0;
}
+ public void destroy() {
+ releaseLocks();
+ }
+
public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan()
throws IOException {
return plan.getQueryPlan();
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Tue Aug 24 15:47:24 2010
@@ -35,6 +35,8 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -84,11 +87,18 @@ import org.apache.hadoop.hive.ql.plan.Dr
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
import org.apache.hadoop.hive.ql.plan.MsckDesc;
import org.apache.hadoop.hive.ql.plan.ShowFunctionsDesc;
+import org.apache.hadoop.hive.ql.plan.ShowLocksDesc;
+import org.apache.hadoop.hive.ql.plan.LockTableDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc;
import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc;
import org.apache.hadoop.hive.ql.plan.ShowTablesDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
@@ -225,6 +235,21 @@ public class DDLTask extends Task<DDLWor
return showFunctions(showFuncs);
}
+ ShowLocksDesc showLocks = work.getShowLocksDesc();
+ if (showLocks != null) {
+ return showLocks(showLocks);
+ }
+
+ LockTableDesc lockTbl = work.getLockTblDesc();
+ if (lockTbl != null) {
+ return lockTable(lockTbl);
+ }
+
+ UnlockTableDesc unlockTbl = work.getUnlockTblDesc();
+ if (unlockTbl != null) {
+ return unlockTable(unlockTbl);
+ }
+
ShowPartitionsDesc showParts = work.getShowPartsDesc();
if (showParts != null) {
return showPartitions(db, showParts);
@@ -1130,6 +1155,163 @@ public class DDLTask extends Task<DDLWor
}
/**
+ * Write a list of the current locks to a file.
+ *
+ * @param showLocks
+ * the locks we're interested in.
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException
+ * Throws this exception if an unexpected error occurs.
+ */
+ private int showLocks(ShowLocksDesc showLocks) throws HiveException {
+ Context ctx = driverContext.getCtx();
+ HiveLockManager lockMgr = ctx.getHiveLockMgr();
+ if (lockMgr == null) {
+ throw new HiveException("show Locks LockManager not specified");
+ }
+
+ // write the results in the file
+ try {
+ Path resFile = new Path(showLocks.getResFile());
+ FileSystem fs = resFile.getFileSystem(conf);
+ DataOutput outStream = fs.create(resFile);
+ List<HiveLock> locks = lockMgr.getLocks();
+
+ Collections.sort(locks, new Comparator<HiveLock>() {
+
+ @Override
+ public int compare(HiveLock o1, HiveLock o2) {
+ int cmp = o1.getHiveLockObject().getName().compareTo(o2.getHiveLockObject().getName());
+ if (cmp == 0) {
+ if (o1.getHiveLockMode() == o2.getHiveLockMode()) {
+ return cmp;
+ }
+ // EXCLUSIVE locks occur before SHARED locks
+ if (o1.getHiveLockMode() == HiveLockMode.EXCLUSIVE) {
+ return -1;
+ }
+ return +1;
+ }
+ return cmp;
+ }
+
+ });
+
+ Iterator<HiveLock> locksIter = locks.iterator();
+
+ while (locksIter.hasNext()) {
+ HiveLock lock = locksIter.next();
+ outStream.writeBytes(lock.getHiveLockObject().getName());
+ outStream.write(separator);
+ outStream.writeBytes(lock.getHiveLockMode().toString());
+ outStream.write(terminator);
+ }
+ ((FSDataOutputStream) outStream).close();
+ } catch (FileNotFoundException e) {
+ LOG.warn("show function: " + stringifyException(e));
+ return 1;
+ } catch (IOException e) {
+ LOG.warn("show function: " + stringifyException(e));
+ return 1;
+ } catch (Exception e) {
+ throw new HiveException(e.toString());
+ }
+ return 0;
+ }
+
+ /**
+ * Lock the table/partition specified
+ *
+ * @param lockTbl
+ * the table/partition to be locked along with the mode
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException
+ * Throws this exception if an unexpected error occurs.
+ */
+ private int lockTable(LockTableDesc lockTbl) throws HiveException {
+ Context ctx = driverContext.getCtx();
+ HiveLockManager lockMgr = ctx.getHiveLockMgr();
+ if (lockMgr == null) {
+ throw new HiveException("lock Table LockManager not specified");
+ }
+
+ HiveLockMode mode = HiveLockMode.valueOf(lockTbl.getMode());
+ String tabName = lockTbl.getTableName();
+ Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tabName);
+ if (tbl == null) {
+ throw new HiveException("Table " + tabName + " does not exist ");
+ }
+
+ Map<String, String> partSpec = lockTbl.getPartSpec();
+ if (partSpec == null) {
+ HiveLock lck = lockMgr.lock(new HiveLockObject(tbl), mode, true);
+ if (lck == null) {
+ return 1;
+ }
+ return 0;
+ }
+
+ Partition par = db.getPartition(tbl, partSpec, false);
+ if (par == null) {
+ throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist");
+ }
+ HiveLock lck = lockMgr.lock(new HiveLockObject(par), mode, true);
+ if (lck == null) {
+ return 1;
+ }
+ return 0;
+ }
+
+ /**
+ * Unlock the table/partition specified
+ *
+ * @param unlockTbl
+ * the table/partition to be unlocked
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException
+ * Throws this exception if an unexpected error occurs.
+ */
+ private int unlockTable(UnlockTableDesc unlockTbl) throws HiveException {
+ Context ctx = driverContext.getCtx();
+ HiveLockManager lockMgr = ctx.getHiveLockMgr();
+ if (lockMgr == null) {
+ throw new HiveException("unlock Table LockManager not specified");
+ }
+
+ String tabName = unlockTbl.getTableName();
+ Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tabName);
+ if (tbl == null) {
+ throw new HiveException("Table " + tabName + " does not exist ");
+ }
+
+ Map<String, String> partSpec = unlockTbl.getPartSpec();
+ HiveLockObject obj = null;
+
+ if (partSpec == null) {
+ obj = new HiveLockObject(tbl);
+ }
+ else {
+ Partition par = db.getPartition(tbl, partSpec, false);
+ if (par == null) {
+ throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist");
+ }
+ obj = new HiveLockObject(par);
+ }
+
+ List<HiveLock> locks = lockMgr.getLocks(obj);
+ if ((locks == null) || (locks.isEmpty())) {
+ throw new HiveException("Table " + tabName + " is not locked ");
+ }
+ Iterator<HiveLock> locksIter = locks.iterator();
+ while (locksIter.hasNext()) {
+ HiveLock lock = locksIter.next();
+ lockMgr.unlock(lock);
+ }
+
+ return 0;
+ }
+
+ /**
* Shows a description of a function.
*
* @param descFunc
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr;
+
+public abstract class HiveLock {
+ public abstract HiveLockObject getHiveLockObject();
+ public abstract HiveLockMode getHiveLockMode();
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import java.util.List;
+
+public interface HiveLockManager {
+
+ public void setContext(HiveLockManagerCtx ctx) throws LockException;
+
+ /**
+ * @param key object to be locked
+ * @param mode mode of the lock (SHARED/EXCLUSIVE)
+ * @param keepAlive if the lock needs to be persisted after the statement
+ */
+ public HiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive)
+ throws LockException;
+ public void unlock(HiveLock hiveLock) throws LockException;
+
+ public List<HiveLock> getLocks() throws LockException;
+ public List<HiveLock> getLocks(HiveLockObject key) throws LockException;
+ public void close() throws LockException;
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class HiveLockManagerCtx {
+ HiveConf conf;
+
+ public HiveLockManagerCtx() {
+ }
+
+ public HiveLockManagerCtx(HiveConf conf) {
+ this.conf = conf;
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public void setConf(HiveConf conf) {
+ this.conf = conf;
+ }
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public enum HiveLockMode {
+ SHARED, EXCLUSIVE;
+}
+
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+public class HiveLockObject {
+ /**
+ * The table.
+ */
+ private Table t;
+
+ /**
+ * The partition. This is null for a non partitioned table.
+ */
+ private Partition p;
+
+ public HiveLockObject() {
+ this.t = null;
+ this.p = null;
+ }
+
+ public HiveLockObject(Table t) {
+ this.t = t;
+ this.p = null;
+ }
+
+ public HiveLockObject(Partition p) {
+ this.t = null;
+ this.p = p;
+ }
+
+ public Table getTable() {
+ return t;
+ }
+
+ public void setTable (Table t) {
+ this.t = t;
+ }
+
+ public Partition getPartition() {
+ return p;
+ }
+
+ public void setPartition (Partition p) {
+ this.p = p;
+ }
+
+ public String getName() {
+ if (t != null) {
+ return t.getCompleteName();
+ }
+ else {
+ return p.getCompleteName();
+ }
+ }
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * Exception from lock manager.
+ */
+
+public class LockException extends HiveException {
+
+ private static final long serialVersionUID = 1L;
+
+ public LockException() {
+ super();
+ }
+
+ public LockException(String message) {
+ super(message);
+ }
+
+ public LockException(Throwable cause) {
+ super(cause);
+ }
+
+ public LockException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive Lock Manager interfaces and some custom implmentations
+ */
+package org.apache.hadoop.hive.ql.lockmgr;
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
+
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+
+public class ZooKeeperHiveLock extends HiveLock {
+ private String path;
+ private HiveLockObject obj;
+ private HiveLockMode mode;
+
+ public ZooKeeperHiveLock(String path, HiveLockObject obj, HiveLockMode mode) {
+ this.path = path;
+ this.obj = obj;
+ this.mode = mode;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public HiveLockObject getHiveLockObject() {
+ return obj;
+ }
+
+ public void setHiveLockObject(HiveLockObject obj) {
+ this.obj = obj;
+ }
+
+ public HiveLockMode getHiveLockMode() {
+ return mode;
+ }
+
+ public void setHiveLockMode(HiveLockMode mode) {
+ this.mode = mode;
+ }
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+import org.apache.commons.lang.StringEscapeUtils;
+
+import org.apache.hadoop.hive.ql.parse.ErrorMsg;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
+public class ZooKeeperHiveLockManager implements HiveLockManager {
+ HiveLockManagerCtx ctx;
+ public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager");
+ static final private LogHelper console = new LogHelper(LOG);
+
+ private ZooKeeper zooKeeper;
+
+ public ZooKeeperHiveLockManager() {
+ }
+
+ /**
+ * @param conf The hive configuration
+ * Get the quorum server address from the configuration. The format is:
+ * host1:port, host2:port..
+ **/
+ private static String getQuorumServers(HiveConf conf) {
+ String hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM);
+ String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT);
+ return hosts + ":" + port;
+ }
+
+ /**
+ * @param ctx The lock manager context (containing the Hive configuration file)
+ * Start the ZooKeeper client based on the zookeeper cluster specified in the conf.
+ **/
+ public void setContext(HiveLockManagerCtx ctx) throws LockException {
+ this.ctx = ctx;
+ HiveConf conf = ctx.getConf();
+ int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ String quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf);
+
+ try {
+ if (zooKeeper != null) {
+ zooKeeper.close();
+ }
+
+ zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher());
+ } catch (Exception e) {
+ LOG.error("Failed to create ZooKeeper object: " + e);
+ throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
+ }
+ }
+
+ /**
+ * Since partition names can contain "/", which need all the parent directories to be created by ZooKeeper,
+ * replace "/" by a dummy name to ensure a single hierarchy.
+ **/
+ private String getObjectName(HiveLockObject key, HiveLockMode mode) {
+ return "/" + key.getName().replaceAll("/", ctx.getConf().getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)) + "-" + mode + "-";
+ }
+
+ /**
+ * @param key The object to be locked
+ * @param mode The mode of the lock
+ * @param keepAlive Whether the lock is to be persisted after the statement
+ * Acuire the lock. Return null if a conflicting lock is present.
+ **/
+ public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive)
+ throws LockException {
+ String name = getObjectName(key, mode);
+ String res;
+
+ try {
+ if (keepAlive) {
+ res = zooKeeper.create(name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+ }
+ else {
+ res = zooKeeper.create(name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+
+ int seqNo = getSequenceNumber(res, name);
+ if (seqNo == -1) {
+ zooKeeper.delete(res, -1);
+ return null;
+ }
+
+ List<String> children = zooKeeper.getChildren("/", false);
+
+ String exLock = getObjectName(key, HiveLockMode.EXCLUSIVE);
+ String shLock = getObjectName(key, HiveLockMode.SHARED);
+
+ for (String child : children) {
+ child = "/" + child;
+
+ // Is there a conflicting lock on the same object with a lower sequence number
+ int childSeq = seqNo;
+ if (child.startsWith(exLock)) {
+ childSeq = getSequenceNumber(child, exLock);
+ }
+ if ((mode == HiveLockMode.EXCLUSIVE) && child.startsWith(shLock)) {
+ childSeq = getSequenceNumber(child, shLock);
+ }
+
+ if ((childSeq >= 0) && (childSeq < seqNo)) {
+ zooKeeper.delete(res, -1);
+ console.printError("conflicting lock present ");
+ return null;
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Failed to get ZooKeeper lock: " + e);
+ throw new LockException(e);
+ }
+
+ return new ZooKeeperHiveLock(res, key, mode);
+ }
+
+ /* Remove the lock specified */
+ public void unlock(HiveLock hiveLock) throws LockException {
+ unlock(ctx.getConf(), zooKeeper, hiveLock);
+ }
+
+ /* Remove the lock specified */
+ private static void unlock(HiveConf conf, ZooKeeper zkpClient, HiveLock hiveLock) throws LockException {
+ ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock;
+ try {
+ zkpClient.delete(zLock.getPath(), -1);
+ } catch (Exception e) {
+ LOG.error("Failed to release ZooKeeper lock: " + e);
+ throw new LockException(e);
+ }
+ }
+
+ /* Release all locks - including PERSISTENT locks */
+ public static void releaseAllLocks(HiveConf conf) throws Exception {
+ try {
+ int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ String quorumServers = getQuorumServers(conf);
+ ZooKeeper zkpClient = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher());
+ List<HiveLock> locks = getLocks(conf, zkpClient, null);
+ if (locks != null) {
+ for (HiveLock lock : locks) {
+ unlock(conf, zkpClient, lock);
+ }
+ }
+
+ zkpClient.close();
+ zkpClient = null;
+ } catch (Exception e) {
+ LOG.error("Failed to release all locks: " + e.getMessage());
+ throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
+ }
+ }
+
+ /* Get all locks */
+ public List<HiveLock> getLocks() throws LockException {
+ return getLocks(ctx.getConf(), zooKeeper, null);
+ }
+
+ /* Get all locks for a particular object */
+ public List<HiveLock> getLocks(HiveLockObject key) throws LockException {
+ return getLocks(ctx.getConf(), zooKeeper, key);
+ }
+
+ /**
+ * @param conf Hive configuration
+ * @param zkpClient The ZooKeeper client
+ * @param key The object to be compared against - if key is null, then get all locks
+ **/
+ private static List<HiveLock> getLocks(HiveConf conf, ZooKeeper zkpClient, HiveLockObject key) throws LockException {
+ List<HiveLock> locks = new ArrayList<HiveLock>();
+ List<String> children;
+
+ try {
+ children = zkpClient.getChildren("/", false);
+ } catch (Exception e) {
+ LOG.error("Failed to get ZooKeeper children: " + e);
+ throw new LockException(e);
+ }
+
+ for (String child : children) {
+ child = "/" + child;
+
+ HiveLockMode mode = getLockMode(conf, child);
+ if (mode == null) {
+ continue;
+ }
+
+ HiveLockObject obj = getLockObject(conf, child, mode);
+ if ((key == null) || (obj.getName().equals(key.getName()))) {
+ HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(child, obj, mode));
+ locks.add(lck);
+ }
+ }
+
+ return locks;
+ }
+
+ /* Release all transient locks, by simply closing the client */
+ public void close() throws LockException {
+ try {
+ if (zooKeeper != null) {
+ zooKeeper.close();
+ zooKeeper = null;
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close zooKeeper client: " + e);
+ throw new LockException(e);
+ }
+ }
+
+ /**
+ * Get the sequence number from the path. The sequence number is always at the end of the path.
+ **/
+ private int getSequenceNumber(String resPath, String path) {
+ String tst = resPath.substring(path.length());
+ try {
+ return (new Integer(tst)).intValue();
+ } catch (Exception e) {
+ return -1; // invalid number
+ }
+ }
+
+ /**
+ * Get the object from the path of the lock.
+ * The object may correspond to a table, a partition or a parent to a partition.
+ * For eg: if Table T is partitioned by ds, hr and ds=1/hr=1 is a valid partition,
+ * the lock may also correspond to T@ds=1, which is not a valid object
+ **/
+ private static HiveLockObject getLockObject(HiveConf conf, String path, HiveLockMode mode) throws LockException {
+ try {
+ Hive db = Hive.get(conf);
+ int indx = path.lastIndexOf(mode.toString());
+ String objName = path.substring(1, indx-1);
+
+ String[] names = objName.split("@");
+ Table tab = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ names[1], false); // do not throw exception if table does not exist
+ assert (tab != null);
+
+ if (names.length == 2) {
+ return new HiveLockObject(tab);
+ }
+
+ String[] parts = names[2].split(conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME));
+
+ Map<String, String> partSpec = new HashMap<String, String>();
+ for (indx = 0; indx < parts.length; indx++) {
+ String[] partVals = parts[indx].split("=");
+ partSpec.put(partVals[0], partVals[1]);
+ }
+
+ Partition partn;
+ try {
+ partn = db.getPartition(tab, partSpec, false);
+ } catch (HiveException e) {
+ partn =null;
+ }
+
+ if (partn == null) {
+ return new HiveLockObject(new DummyPartition(
+ objName.replaceAll(conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME), "/")));
+ }
+
+ return new HiveLockObject(partn);
+ } catch (Exception e) {
+ LOG.error("Failed to create ZooKeeper object: " + e);
+ throw new LockException(e);
+ }
+ }
+
+ private static Pattern shMode = Pattern.compile("^.*-(SHARED)-([0-9]+)$");
+ private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$");
+
+ /* Get the mode of the lock encoded in the path */
+ private static HiveLockMode getLockMode(HiveConf conf, String path) {
+
+ Matcher shMatcher = shMode.matcher(path);
+ Matcher exMatcher = exMode.matcher(path);
+
+ if (shMatcher.matches())
+ return HiveLockMode.SHARED;
+
+ if (exMatcher.matches()) {
+ return HiveLockMode.EXCLUSIVE;
+ }
+
+ return null;
+ }
+
+ public static class DummyWatcher implements Watcher {
+ public void process(org.apache.zookeeper.WatchedEvent event) {
+ }
+ }
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java?rev=988603&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java Tue Aug 24 15:47:24 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A Hive Table Partition: is a fundamental storage unit within a Table. Currently, Hive does not support
+ * hierarchical partitions - For eg: if partition ds=1, hr=1 exists, there is no way to access ds=1
+ *
+ * Hierarchical partitions are needed in some cases, for eg. locking. For now, create a dummy partition to
+ * satisfy this
+ */
+public class DummyPartition extends Partition {
+
+ @SuppressWarnings("nls")
+ static final private Log LOG = LogFactory
+ .getLog("hive.ql.metadata.DummyPartition");
+
+ private String name;
+ public DummyPartition() {
+ }
+
+ public DummyPartition(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getCompleteName() {
+ return getName();
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Tue Aug 24 15:47:24 2010
@@ -520,4 +520,12 @@ public class Partition implements Serial
ProtectMode mode = getProtectMode();
return (!mode.offline && !mode.readOnly);
}
+
+ /**
+ * @return include the db name
+ */
+ public String getCompleteName() {
+ return getTable().getCompleteName() + "@" + getName();
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Tue Aug 24 15:47:24 2010
@@ -794,4 +794,11 @@ public class Table implements Serializab
ProtectMode mode = getProtectMode();
return (!mode.offline && !mode.readOnly);
}
+
+ /**
+ * @return include the db name
+ */
+ public String getCompleteName() {
+ return getDbName() + "@" + getTableName();
+ }
};
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Tue Aug 24 15:47:24 2010
@@ -69,6 +69,9 @@ import org.apache.hadoop.hive.ql.plan.Sh
import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc;
import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc;
import org.apache.hadoop.hive.ql.plan.ShowTablesDesc;
+import org.apache.hadoop.hive.ql.plan.ShowLocksDesc;
+import org.apache.hadoop.hive.ql.plan.LockTableDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.serde.Constants;
@@ -131,7 +134,7 @@ public class DDLSemanticAnalyzer extends
super(conf);
// Partition can't have this name
reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME));
-
+ reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME));
// Partition value can't end in this suffix
reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ORIGINAL));
reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ARCHIVED));
@@ -171,6 +174,9 @@ public class DDLSemanticAnalyzer extends
} else if (ast.getToken().getType() == HiveParser.TOK_SHOWFUNCTIONS) {
ctx.setResFile(new Path(ctx.getLocalTmpFileURI()));
analyzeShowFunctions(ast);
+ } else if (ast.getToken().getType() == HiveParser.TOK_SHOWLOCKS) {
+ ctx.setResFile(new Path(ctx.getLocalTmpFileURI()));
+ analyzeShowLocks(ast);
} else if (ast.getToken().getType() == HiveParser.TOK_DESCFUNCTION) {
ctx.setResFile(new Path(ctx.getLocalTmpFileURI()));
analyzeDescFunction(ast);
@@ -212,6 +218,10 @@ public class DDLSemanticAnalyzer extends
} else if (ast.getToken().getType() == HiveParser.TOK_SHOWPARTITIONS) {
ctx.setResFile(new Path(ctx.getLocalTmpFileURI()));
analyzeShowPartitions(ast);
+ } else if (ast.getToken().getType() == HiveParser.TOK_LOCKTABLE) {
+ analyzeLockTable(ast);
+ } else if (ast.getToken().getType() == HiveParser.TOK_UNLOCKTABLE) {
+ analyzeUnlockTable(ast);
} else {
throw new SemanticException("Unsupported command.");
}
@@ -808,6 +818,84 @@ public class DDLSemanticAnalyzer extends
/**
* Add the task according to the parsed command tree. This is used for the CLI
+ * command "SHOW LOCKS;".
+ *
+ * @param ast
+ * The parsed command tree.
+ * @throws SemanticException
+ * Parsing failed
+ */
+ private void analyzeShowLocks(ASTNode ast) throws SemanticException {
+ ShowLocksDesc showLocksDesc = new ShowLocksDesc(ctx.getResFile());
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
+ showLocksDesc), conf));
+ setFetchTask(createFetchTask(showLocksDesc.getSchema()));
+
+ // Need to initialize the lock manager
+ ctx.setNeedLockMgr(true);
+ }
+
+ /**
+ * Add the task according to the parsed command tree. This is used for the CLI
+ * command "LOCK TABLE ..;".
+ *
+ * @param ast
+ * The parsed command tree.
+ * @throws SemanticException
+ * Parsing failed
+ */
+ private void analyzeLockTable(ASTNode ast)
+ throws SemanticException {
+ String tableName = unescapeIdentifier(ast.getChild(0).getText().toLowerCase());
+ String mode = unescapeIdentifier(ast.getChild(1).getText().toUpperCase());
+ List<Map<String, String>> partSpecs = getPartitionSpecs(ast);
+
+ // We only can have a single partition spec
+ assert(partSpecs.size() <= 1);
+ Map<String, String> partSpec = null;
+ if (partSpecs.size() > 0) {
+ partSpec = partSpecs.get(0);
+ }
+
+ LockTableDesc lockTblDesc = new LockTableDesc(tableName, mode, partSpec);
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
+ lockTblDesc), conf));
+
+ // Need to initialize the lock manager
+ ctx.setNeedLockMgr(true);
+ }
+
+ /**
+ * Add the task according to the parsed command tree. This is used for the CLI
+ * command "UNLOCK TABLE ..;".
+ *
+ * @param ast
+ * The parsed command tree.
+ * @throws SemanticException
+ * Parsing failed
+ */
+ private void analyzeUnlockTable(ASTNode ast)
+ throws SemanticException {
+ String tableName = unescapeIdentifier(ast.getChild(0).getText().toLowerCase());
+ List<Map<String, String>> partSpecs = getPartitionSpecs(ast);
+
+ // We only can have a single partition spec
+ assert(partSpecs.size() <= 1);
+ Map<String, String> partSpec = null;
+ if (partSpecs.size() > 0) {
+ partSpec = partSpecs.get(0);
+ }
+
+ UnlockTableDesc unlockTblDesc = new UnlockTableDesc(tableName, partSpec);
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
+ unlockTblDesc), conf));
+
+ // Need to initialize the lock manager
+ ctx.setNeedLockMgr(true);
+ }
+
+ /**
+ * Add the task according to the parsed command tree. This is used for the CLI
* command "DESCRIBE FUNCTION;".
*
* @param ast
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java Tue Aug 24 15:47:24 2010
@@ -150,6 +150,10 @@ public enum ErrorMsg {
+ "STRING instead."),
CREATE_NON_NATIVE_AS("CREATE TABLE AS SELECT cannot be used for a non-native table"),
LOAD_INTO_NON_NATIVE("A non-native table cannot be used as target for LOAD"),
+ LOCKMGR_NOT_SPECIFIED("lock manager not specified correctly, set hive.lock.manager"),
+ LOCKMGR_NOT_INITIALIZED("lock manager could not be initialized, check hive.lock.manager "),
+ LOCK_CANNOT_BE_ACQUIRED("locks on the underlying objects cannot be acquired. retry after some time"),
+ ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED("Check hive.zookeeper.quorum and hive.zookeeper.client.port"),
OVERWRITE_ARCHIVED_PART("Cannot overwrite an archived partition. " +
"Unarchive before running this command."),
ARCHIVE_METHODS_DISABLED("Archiving methods are currently disabled. " +
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Tue Aug 24 15:47:24 2010
@@ -117,6 +117,9 @@ TOK_SHOWTABLES;
TOK_SHOWFUNCTIONS;
TOK_SHOWPARTITIONS;
TOK_SHOW_TABLESTATUS;
+TOK_SHOWLOCKS;
+TOK_LOCKTABLE;
+TOK_UNLOCKTABLE;
TOK_DROPTABLE;
TOK_TABCOLLIST;
TOK_TABCOL;
@@ -237,6 +240,8 @@ ddlStatement
| dropIndexStatement
| alterIndexRebuild
| dropFunctionStatement
+ | lockStatement
+ | unlockStatement
;
ifNotExists
@@ -577,6 +582,25 @@ showStatement
| KW_SHOW KW_PARTITIONS Identifier partitionSpec? -> ^(TOK_SHOWPARTITIONS Identifier partitionSpec?)
| KW_SHOW KW_TABLE KW_EXTENDED ((KW_FROM|KW_IN) db_name=Identifier)? KW_LIKE showStmtIdentifier partitionSpec?
-> ^(TOK_SHOW_TABLESTATUS showStmtIdentifier $db_name? partitionSpec?)
+ | KW_SHOW KW_LOCKS -> ^(TOK_SHOWLOCKS)
+ ;
+
+lockStatement
+@init { msgs.push("lock statement"); }
+@after { msgs.pop(); }
+ : KW_LOCK KW_TABLE Identifier partitionSpec? lockMode -> ^(TOK_LOCKTABLE Identifier lockMode partitionSpec?)
+ ;
+
+lockMode
+@init { msgs.push("lock mode"); }
+@after { msgs.pop(); }
+ : KW_SHARED | KW_EXCLUSIVE
+ ;
+
+unlockStatement
+@init { msgs.push("unlock statement"); }
+@after { msgs.pop(); }
+ : KW_UNLOCK KW_TABLE Identifier partitionSpec? -> ^(TOK_UNLOCKTABLE Identifier partitionSpec?)
;
metastoreCheck
@@ -1012,7 +1036,7 @@ selectClause
@init { msgs.push("select clause"); }
@after { msgs.pop(); }
:
- KW_SELECT hintClause? (((KW_ALL | dist=KW_DISTINCT)? selectList)
+ KW_SELECT hintClause? (((KW_ALL | dist=KW_DISTINCT)? selectList)
| (transform=KW_TRANSFORM selectTrfmClause))
-> {$transform == null && $dist == null}? ^(TOK_SELECT hintClause? selectList)
-> {$transform == null && $dist != null}? ^(TOK_SELECTDI hintClause? selectList)
@@ -1027,7 +1051,7 @@ selectList
:
selectItem ( COMMA selectItem )* -> selectItem+
;
-
+
selectTrfmClause
@init { msgs.push("transform clause"); }
@after { msgs.pop(); }
@@ -1770,7 +1794,10 @@ KW_REVOKE: 'REVOKE';
KW_SSL: 'SSL';
KW_UNDO: 'UNDO';
KW_LOCK: 'LOCK';
+KW_LOCKS: 'LOCKS';
KW_UNLOCK: 'UNLOCK';
+KW_SHARED: 'SHARED';
+KW_EXCLUSIVE: 'EXCLUSIVE';
KW_PROCEDURE: 'PROCEDURE';
KW_UNSIGNED: 'UNSIGNED';
KW_WHILE: 'WHILE';
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java?rev=988603&r1=988602&r2=988603&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java Tue Aug 24 15:47:24 2010
@@ -56,6 +56,7 @@ public final class SemanticAnalyzerFacto
commandType.put(HiveParser.TOK_SHOW_TABLESTATUS, "SHOW_TABLESTATUS");
commandType.put(HiveParser.TOK_SHOWFUNCTIONS, "SHOWFUNCTIONS");
commandType.put(HiveParser.TOK_SHOWPARTITIONS, "SHOWPARTITIONS");
+ commandType.put(HiveParser.TOK_SHOWLOCKS, "SHOWLOCKS");
commandType.put(HiveParser.TOK_CREATEFUNCTION, "CREATEFUNCTION");
commandType.put(HiveParser.TOK_DROPFUNCTION, "DROPFUNCTION");
commandType.put(HiveParser.TOK_CREATEVIEW, "CREATEVIEW");
@@ -65,6 +66,8 @@ public final class SemanticAnalyzerFacto
commandType.put(HiveParser.TOK_ALTERINDEX_REBUILD, "ALTERINDEX_REBUILD");
commandType.put(HiveParser.TOK_ALTERVIEW_PROPERTIES, "ALTERVIEW_PROPERTIES");
commandType.put(HiveParser.TOK_QUERY, "QUERY");
+ commandType.put(HiveParser.TOK_LOCKTABLE, "LOCKTABLE");
+ commandType.put(HiveParser.TOK_UNLOCKTABLE, "UNLOCKTABLE");
}
static {
@@ -109,12 +112,15 @@ public final class SemanticAnalyzerFacto
case HiveParser.TOK_SHOW_TABLESTATUS:
case HiveParser.TOK_SHOWFUNCTIONS:
case HiveParser.TOK_SHOWPARTITIONS:
+ case HiveParser.TOK_SHOWLOCKS:
case HiveParser.TOK_CREATEINDEX:
case HiveParser.TOK_DROPINDEX:
case HiveParser.TOK_ALTERTABLE_CLUSTER_SORT:
case HiveParser.TOK_ALTERTABLE_TOUCH:
case HiveParser.TOK_ALTERTABLE_ARCHIVE:
case HiveParser.TOK_ALTERTABLE_UNARCHIVE:
+ case HiveParser.TOK_LOCKTABLE:
+ case HiveParser.TOK_UNLOCKTABLE:
return new DDLSemanticAnalyzer(conf);
case HiveParser.TOK_ALTERTABLE_PARTITION:
String commandType = null;