You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dh...@apache.org on 2009/01/06 23:03:29 UTC
svn commit: r732142 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/metadata/
ql/src/java/org/apache/hadoop/hive/ql/session/
ql/src/test/org/apache/hadoop/hive/ql/ ql/src/test/queries/clien...
Author: dhruba
Date: Tue Jan 6 14:03:29 2009
New Revision: 732142
URL: http://svn.apache.org/viewvc?rev=732142&view=rev
Log:
HIVE-84. Make MetaStore Client thread safe. (Prasad Chakka via dhruba)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java
hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Jan 6 14:03:29 2009
@@ -176,3 +176,5 @@
HIVE-48. Support JDBC connections for interoperability between
Hive and RDBMS. (Raghotham Murthy and Michi Mutsuzaki via dhruba)
+ HIVE-84. Make MetaStore Client thread safe. (Prasad Chakka via dhruba)
+
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Jan 6 14:03:29 2009
@@ -41,6 +41,15 @@
protected String auxJars;
private static final Log l4j = LogFactory.getLog(HiveConf.class);
+ /**
+ * metastore related options that the db is initialized against
+ */
+ public final static HiveConf.ConfVars [] metaVars = {
+ HiveConf.ConfVars.METASTOREDIRECTORY,
+ HiveConf.ConfVars.METASTOREWAREHOUSE,
+ HiveConf.ConfVars.METASTOREURIS
+ };
+
public static enum ConfVars {
// QL execution stuff
SCRIPTWRAPPER("hive.exec.script.wrapper", null),
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Jan 6 14:03:29 2009
@@ -42,7 +42,6 @@
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.Writable;
@@ -63,24 +62,56 @@
public class Hive {
static final private Log LOG = LogFactory.getLog("hive.ql.metadata.Hive");
-
- private HiveConf conf;
- private IMetaStoreClient msc;
-
static Hive db = null;
+ private HiveConf conf = null;
+ private ThreadLocal<IMetaStoreClient> threadLocalMSC = new ThreadLocal() {
+ protected synchronized Object initialValue() {
+ return null;
+ }
+
+ public synchronized void remove() {
+ if( this.get() != null ) {
+ ((IMetaStoreClient)this.get()).close();
+ }
+ super.remove();
+ }
+ };
+
/**
- * get
- *
- * @param c
- * @return
+ * Returns hive object for the current thread. If one is not initialized then a new one is created
+ * If the new configuration is different in metadata conf vars then a new one is created.
+ * @param c new Hive Configuration
+ * @return Hive object for current thread
* @exception
*
*/
public static Hive get(HiveConf c) throws HiveException {
- if(db == null) {
- // TODO - this doesn't work if the user switches to a different metadb server
- // during the same session. revisit.
+ boolean needsRefresh = false;
+
+ if(db != null) {
+ for(HiveConf.ConfVars oneVar: HiveConf.metaVars) {
+ String oldVar = db.getConf().getVar(oneVar);
+ String newVar = c.getVar(oneVar);
+ if(oldVar.compareToIgnoreCase(newVar) != 0) {
+ needsRefresh = true;
+ break;
+ }
+ }
+ }
+ return get(c, needsRefresh);
+ }
+
+ /**
+ * get a connection to metastore. see get(HiveConf) function for comments
+ * @param c new conf
+ * @param needsRefresh if true then creates a new one
+ * @return
+ * @throws HiveException
+ */
+ public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
+ if(db == null || needsRefresh) {
+ closeCurrent();
c.set("fs.scheme.class","dfs");
db = new Hive(c);
}
@@ -89,11 +120,17 @@
public static Hive get() throws HiveException {
if(db == null) {
- db = new Hive(new HiveConf(ParseDriver.class));
+ db = new Hive(new HiveConf(Hive.class));
}
return db;
}
+ public static void closeCurrent() {
+ if(db != null) {
+ db.close();
+ }
+ }
+
/**
* Hive
*
@@ -103,22 +140,14 @@
*/
private Hive(HiveConf c) throws HiveException {
this.conf = c;
- try {
- msc = this.createMetaStoreClient();
- } catch (MetaException e) {
- throw new HiveException("Unable to open connection to metastore", e);
- }
}
- public static void closeCurrent() {
- if(Hive.db != null) {
- LOG.info("Closing current connection to Hive Metastore.");
- if(db.msc != null) {
- db.msc.close();
- }
- db.msc = null;
- db = null;
- }
+ /**
+ * closes the connection to metastore for the calling thread
+ */
+ private void close() {
+ LOG.info("Closing current thread's connection to Hive Metastore.");
+ db.threadLocalMSC.remove();
}
/**
@@ -186,7 +215,7 @@
public void alterTable(String tblName,
Table newTbl) throws InvalidOperationException,
MetaException, TException {
- msc.alter_table(MetaStoreUtils.DEFAULT_DATABASE_NAME, tblName, newTbl.getTTable());
+ getMSC().alter_table(MetaStoreUtils.DEFAULT_DATABASE_NAME, tblName, newTbl.getTTable());
}
/**
@@ -201,7 +230,7 @@
tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getName(), tbl.getDeserializer()));
}
tbl.checkValidity();
- msc.createTable(tbl.getTTable());
+ getMSC().createTable(tbl.getTTable());
} catch (HiveException e) {
throw e;
} catch (Exception e) {
@@ -259,7 +288,7 @@
boolean ignoreUnknownTab) throws HiveException {
try {
- msc.dropTable(dbName, tableName, deleteData, ignoreUnknownTab);
+ getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab);
} catch (NoSuchObjectException e) {
if (!ignoreUnknownTab) {
throw new HiveException(e);
@@ -333,7 +362,7 @@
Table table = new Table();
org.apache.hadoop.hive.metastore.api.Table tTable = null;
try {
- tTable = msc.getTable(dbName, tableName);
+ tTable = getMSC().getTable(dbName, tableName);
} catch (NoSuchObjectException e) {
if(throwException) {
LOG.error(StringUtils.stringifyException(e));
@@ -387,7 +416,7 @@
*/
public List<String> getTablesByPattern(String tablePattern) throws HiveException {
try {
- return msc.getTables(MetaStoreUtils.DEFAULT_DATABASE_NAME, tablePattern);
+ return getMSC().getTables(MetaStoreUtils.DEFAULT_DATABASE_NAME, tablePattern);
} catch(Exception e) {
throw new HiveException(e);
}
@@ -396,7 +425,7 @@
// for testing purposes
protected List<String> getTablesForDb(String database, String tablePattern) throws HiveException {
try {
- return msc.getTables(database, tablePattern);
+ return getMSC().getTables(database, tablePattern);
} catch(Exception e) {
throw new HiveException(e);
}
@@ -413,7 +442,7 @@
*/
protected boolean createDatabase(String name, String locationUri) throws AlreadyExistsException,
MetaException, TException {
- return msc.createDatabase(name, locationUri);
+ return getMSC().createDatabase(name, locationUri);
}
/**
@@ -424,7 +453,7 @@
* @see org.apache.hadoop.hive.metastore.HiveMetaStoreClient#dropDatabase(java.lang.String)
*/
protected boolean dropDatabase(String name) throws MetaException, TException {
- return msc.dropDatabase(name);
+ return getMSC().dropDatabase(name);
}
/**
@@ -485,7 +514,7 @@
pvals.add(partSpec.get(field.getName()));
}
try {
- tpart = msc.appendPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);;
+ tpart = getMSC().appendPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);;
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
@@ -516,10 +545,10 @@
}
org.apache.hadoop.hive.metastore.api.Partition tpart = null;
try {
- tpart = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);
+ tpart = getMSC().getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);
if(tpart == null && forceCreate) {
LOG.debug("creating partition for table " + tbl.getName() + " with partition spec : " + partSpec);
- tpart = msc.appendPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);;
+ tpart = getMSC().appendPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);;
}
if(tpart == null){
return null;
@@ -534,7 +563,7 @@
public boolean dropPartition(String db_name, String tbl_name, List<String> part_vals,
boolean deleteData) throws HiveException {
try {
- return msc.dropPartition(db_name, tbl_name, part_vals, deleteData);
+ return getMSC().dropPartition(db_name, tbl_name, part_vals, deleteData);
} catch (NoSuchObjectException e) {
throw new HiveException("Partition or table doesn't exist.", e);
} catch (Exception e) {
@@ -545,7 +574,7 @@
public List<String> getPartitionNames(String dbName, String tblName, short max) throws HiveException {
List names = null;
try {
- names = msc.listPartitionNames(dbName, tblName, max);
+ names = getMSC().listPartitionNames(dbName, tblName, max);
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
@@ -563,7 +592,7 @@
if(tbl.isPartitioned()) {
List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
try {
- tParts = msc.listPartitions(tbl.getDbName(), tbl.getName(), (short) -1);
+ tParts = getMSC().listPartitions(tbl.getDbName(), tbl.getName(), (short) -1);
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
@@ -711,6 +740,19 @@
}
return new MetaStoreClient(this.conf);
}
+
+ /**
+ *
+ * @return the metastore client for the current thread
+ * @throws MetaException
+ */
+ private IMetaStoreClient getMSC() throws MetaException {
+ IMetaStoreClient msc = threadLocalMSC.get();
+ if(msc == null) {
+ msc = this.createMetaStoreClient();
+ }
+ return msc;
+ }
public static List<FieldSchema> getFieldsFromDeserializer(String name, Deserializer serde) throws HiveException {
try {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Tue Jan 6 14:03:29 2009
@@ -92,21 +92,12 @@
this.conf = conf;
this.db = db;
- for(HiveConf.ConfVars oneVar: metaVars) {
+ for(HiveConf.ConfVars oneVar: HiveConf.metaVars) {
dbOptions.put(oneVar, conf.getVar(oneVar));
}
}
/**
- * metastore related options that the db is initialized against
- */
- protected final static HiveConf.ConfVars [] metaVars = {
- HiveConf.ConfVars.METASTOREDIRECTORY,
- HiveConf.ConfVars.METASTOREWAREHOUSE,
- HiveConf.ConfVars.METASTOREURIS
- };
-
- /**
* cached values of such options
*/
private final HashMap<HiveConf.ConfVars, String> dbOptions =
@@ -115,7 +106,7 @@
public Hive getDb() throws HiveException {
boolean needsRefresh = false;
- for(HiveConf.ConfVars oneVar: metaVars) {
+ for(HiveConf.ConfVars oneVar: HiveConf.metaVars) {
if(!StringUtils.isEmpty(StringUtils.difference(dbOptions.get(oneVar), conf.getVar(oneVar)))) {
needsRefresh = true;
break;
@@ -123,7 +114,7 @@
}
if((db == null) || needsRefresh) {
- db = Hive.get(conf);
+ db = Hive.get(conf, needsRefresh);
}
return db;
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java Tue Jan 6 14:03:29 2009
@@ -53,17 +53,14 @@
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde2.thrift.test.Complex;
import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ThriftDeserializer;
+import org.apache.hadoop.hive.serde2.thrift.test.Complex;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import com.facebook.thrift.protocol.TBinaryProtocol;
-import org.apache.hadoop.hive.serde2.ThriftDeserializer;
-import java.nio.channels.FileChannel;
-
-import org.antlr.runtime.tree.*;
public class QTestUtil {
@@ -706,13 +703,6 @@
// assumption is that environment has already been cleaned once globally
// hence each thread does not call cleanUp() and createSources() again
qt.cliInit(fname, false);
- /*
- XXX Ugly hack - uncomment this to test without DDLs.
- Should be removed once DDL/metastore mt issues are resolved
- synchronized (this.getClass()) {
- qt.executeOne(fname);
- }
- */
qt.executeClient(fname);
} catch (Throwable e) {
System.err.println("Query file " + fname + " failed with exception " + e.getMessage());
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java Tue Jan 6 14:03:29 2009
@@ -37,7 +37,7 @@
private String resDir = System.getProperty("ql.test.results.clientpositive.dir");
public void testMTQueries1() throws Exception {
- String[] testNames = new String [] {"join1.q", "join2.q", "groupby1.q", "groupby2.q"};
+ String[] testNames = new String [] {"join1.q", "join2.q", "groupby1.q", "groupby2.q", "join3.q", "input1.q", "input19.q"};
String [] resDirs = new String [testNames.length];
File [] qfiles = new File [testNames.length];
for(int i=0; i<resDirs.length; i++) {
Modified: hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q (original)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q Tue Jan 6 14:03:29 2009
@@ -1,3 +1,4 @@
+drop table apachelog;
create table apachelog(ipaddress STRING,identd STRING,user STRING,finishtime STRING,requestline string,returncode INT,size INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe' WITH SERDEPROPERTIES ( 'serialization.format'= 'org.apache.hadoop.hive.serde2.thrift.TCTLSeparatedProtocol', 'quote.delim'= '("|\\[|\\])', 'field.delim'=' ', 'serialization.null.format'='-' ) STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH '../data/files/apache.access.log' INTO TABLE apachelog;
SELECT a.* FROM apachelog a;