You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dh...@apache.org on 2009/01/06 23:03:29 UTC

svn commit: r732142 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/metadata/ ql/src/java/org/apache/hadoop/hive/ql/session/ ql/src/test/org/apache/hadoop/hive/ql/ ql/src/test/queries/clien...

Author: dhruba
Date: Tue Jan  6 14:03:29 2009
New Revision: 732142

URL: http://svn.apache.org/viewvc?rev=732142&view=rev
Log:
HIVE-84.  Make MetaStore Client thread safe.  (Prasad Chakka via dhruba)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Jan  6 14:03:29 2009
@@ -176,3 +176,5 @@
     HIVE-48.  Support JDBC connections for interoperability between 
     Hive and RDBMS.  (Raghotham Murthy and Michi Mutsuzaki via dhruba)
 
+    HIVE-84.  Make MetaStore Client thread safe.  (Prasad Chakka via dhruba)
+

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Jan  6 14:03:29 2009
@@ -41,6 +41,15 @@
   protected String auxJars;
   private static final Log l4j = LogFactory.getLog(HiveConf.class);
 
+  /**
+   * metastore related options that the db is initialized against
+   */
+  public final static HiveConf.ConfVars [] metaVars = {
+    HiveConf.ConfVars.METASTOREDIRECTORY,
+    HiveConf.ConfVars.METASTOREWAREHOUSE,
+    HiveConf.ConfVars.METASTOREURIS
+  };
+
   public static enum ConfVars {
     // QL execution stuff
     SCRIPTWRAPPER("hive.exec.script.wrapper", null),

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Jan  6 14:03:29 2009
@@ -42,7 +42,6 @@
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.ql.parse.ParseDriver;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.Writable;
@@ -63,24 +62,56 @@
 public class Hive {
 
   static final private Log LOG = LogFactory.getLog("hive.ql.metadata.Hive");
-
-  private HiveConf conf;
-  private IMetaStoreClient msc;
-
   static Hive db = null;
   
+  private HiveConf conf = null;
+  private ThreadLocal<IMetaStoreClient> threadLocalMSC = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+        return null;
+    }
+    
+    public synchronized void remove() {
+      if( this.get() != null ) {
+        ((IMetaStoreClient)this.get()).close();
+      }
+      super.remove();
+    }
+  };
+  
   /**
-   * get
-   *
-   * @param c
-   * @return
+   * Returns hive object for the current thread. If one is not initialized then a new one is created 
+   * If the new configuration is different in metadata conf vars then a new one is created.
+   * @param c new Hive Configuration
+   * @return Hive object for current thread
    * @exception
    *
    */
   public static Hive get(HiveConf c) throws HiveException {
-    if(db == null) {
-      // TODO - this doesn't work if the user switches to a different metadb server
-      // during the same session. revisit.
+    boolean needsRefresh = false;
+
+    if(db != null) {
+      for(HiveConf.ConfVars oneVar: HiveConf.metaVars) {
+        String oldVar = db.getConf().getVar(oneVar);
+        String newVar = c.getVar(oneVar);
+        if(oldVar.compareToIgnoreCase(newVar) != 0) {
+          needsRefresh = true;
+          break;
+        }
+      }
+    }
+    return get(c, needsRefresh);
+  }
+
+  /**
+   * get a connection to metastore. see get(HiveConf) function for comments
+   * @param c new conf
+   * @param needsRefresh if true then creates a new one
+   * @return
+   * @throws HiveException
+   */
+  public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
+    if(db == null || needsRefresh) {
+      closeCurrent();
       c.set("fs.scheme.class","dfs");
       db = new Hive(c);
     }
@@ -89,11 +120,17 @@
 
   public static Hive get() throws HiveException {
     if(db == null) {
-      db = new Hive(new HiveConf(ParseDriver.class));
+      db = new Hive(new HiveConf(Hive.class));
     }
     return db;
   }
   
+  public static void closeCurrent() {
+    if(db != null) {
+      db.close();
+    }
+  }
+  
   /**
    * Hive
    *
@@ -103,22 +140,14 @@
    */
   private Hive(HiveConf c) throws  HiveException {
     this.conf = c;
-    try {
-       msc = this.createMetaStoreClient();
-    } catch (MetaException e) {
-      throw new HiveException("Unable to open connection to metastore", e);
-    }
   }
   
-  public static void closeCurrent() {
-    if(Hive.db != null) {
-      LOG.info("Closing current connection to Hive Metastore.");
-      if(db.msc != null) {
-        db.msc.close();
-      }
-      db.msc = null;
-      db = null;
-    }
+  /**
+   * closes the connection to metastore for the calling thread
+   */
+  private void close() {
+    LOG.info("Closing current thread's connection to Hive Metastore.");
+    db.threadLocalMSC.remove();
   }
   
   /**
@@ -186,7 +215,7 @@
   public void alterTable(String tblName,
       Table newTbl) throws InvalidOperationException,
       MetaException, TException {
-    msc.alter_table(MetaStoreUtils.DEFAULT_DATABASE_NAME, tblName, newTbl.getTTable());
+    getMSC().alter_table(MetaStoreUtils.DEFAULT_DATABASE_NAME, tblName, newTbl.getTTable());
   }
 
   /**
@@ -201,7 +230,7 @@
         tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getName(), tbl.getDeserializer()));
       }
       tbl.checkValidity();
-      msc.createTable(tbl.getTTable());
+      getMSC().createTable(tbl.getTTable());
     } catch (HiveException e) {
       throw e;
     } catch (Exception e) {
@@ -259,7 +288,7 @@
       boolean ignoreUnknownTab) throws HiveException {
     
     try {
-      msc.dropTable(dbName, tableName, deleteData, ignoreUnknownTab);
+      getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab);
     } catch (NoSuchObjectException e) {
       if (!ignoreUnknownTab) {
         throw new HiveException(e);
@@ -333,7 +362,7 @@
     Table table = new Table();
     org.apache.hadoop.hive.metastore.api.Table tTable = null;
     try {
-      tTable = msc.getTable(dbName, tableName);
+      tTable = getMSC().getTable(dbName, tableName);
     } catch (NoSuchObjectException e) {
       if(throwException) {
         LOG.error(StringUtils.stringifyException(e));
@@ -387,7 +416,7 @@
    */
   public List<String> getTablesByPattern(String tablePattern) throws HiveException {
     try {
-      return msc.getTables(MetaStoreUtils.DEFAULT_DATABASE_NAME, tablePattern);
+      return getMSC().getTables(MetaStoreUtils.DEFAULT_DATABASE_NAME, tablePattern);
     } catch(Exception e) {
       throw new HiveException(e);
     }
@@ -396,7 +425,7 @@
   // for testing purposes
   protected List<String> getTablesForDb(String database, String tablePattern) throws HiveException {
     try {
-      return msc.getTables(database, tablePattern);
+      return getMSC().getTables(database, tablePattern);
     } catch(Exception e) {
       throw new HiveException(e);
     }
@@ -413,7 +442,7 @@
    */
   protected boolean createDatabase(String name, String locationUri) throws AlreadyExistsException,
       MetaException, TException {
-    return msc.createDatabase(name, locationUri);
+    return getMSC().createDatabase(name, locationUri);
   }
 
   /**
@@ -424,7 +453,7 @@
    * @see org.apache.hadoop.hive.metastore.HiveMetaStoreClient#dropDatabase(java.lang.String)
    */
   protected boolean dropDatabase(String name) throws MetaException, TException {
-    return msc.dropDatabase(name);
+    return getMSC().dropDatabase(name);
   }
 
   /**
@@ -485,7 +514,7 @@
       pvals.add(partSpec.get(field.getName()));
     }
     try {
-      tpart = msc.appendPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);;
+      tpart = getMSC().appendPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);;
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
       throw new HiveException(e);
@@ -516,10 +545,10 @@
     }
     org.apache.hadoop.hive.metastore.api.Partition tpart = null;
     try {
-      tpart = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);
+      tpart = getMSC().getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);
       if(tpart == null && forceCreate) {
         LOG.debug("creating partition for table "  + tbl.getName() + " with partition spec : " + partSpec);
-        tpart = msc.appendPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);;
+        tpart = getMSC().appendPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), pvals);;
       }
       if(tpart == null){
         return null;
@@ -534,7 +563,7 @@
   public boolean dropPartition(String db_name, String tbl_name, List<String> part_vals,
       boolean deleteData) throws HiveException {
     try {
-      return msc.dropPartition(db_name, tbl_name, part_vals, deleteData);
+      return getMSC().dropPartition(db_name, tbl_name, part_vals, deleteData);
     } catch (NoSuchObjectException e) {
       throw new HiveException("Partition or table doesn't exist.", e);
     } catch (Exception e) {
@@ -545,7 +574,7 @@
   public List<String> getPartitionNames(String dbName, String tblName, short max) throws HiveException {
     List names = null;
     try {
-      names = msc.listPartitionNames(dbName, tblName, max);
+      names = getMSC().listPartitionNames(dbName, tblName, max);
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
       throw new HiveException(e);
@@ -563,7 +592,7 @@
     if(tbl.isPartitioned()) {
       List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
       try {
-        tParts = msc.listPartitions(tbl.getDbName(), tbl.getName(), (short) -1);
+        tParts = getMSC().listPartitions(tbl.getDbName(), tbl.getName(), (short) -1);
       } catch (Exception e) {
         LOG.error(StringUtils.stringifyException(e));
         throw new HiveException(e);
@@ -711,6 +740,19 @@
     }
     return new MetaStoreClient(this.conf);
   }
+  
+  /**
+   * 
+   * @return the metastore client for the current thread
+   * @throws MetaException
+   */
+  private IMetaStoreClient getMSC() throws MetaException {
+    IMetaStoreClient msc = threadLocalMSC.get();
+    if(msc == null) {
+      msc = this.createMetaStoreClient();
+    }
+    return msc;
+  }
 
   public static List<FieldSchema> getFieldsFromDeserializer(String name, Deserializer serde) throws HiveException {
     try {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Tue Jan  6 14:03:29 2009
@@ -92,21 +92,12 @@
     this.conf = conf;
     this.db = db;
 
-    for(HiveConf.ConfVars oneVar: metaVars) {
+    for(HiveConf.ConfVars oneVar: HiveConf.metaVars) {
       dbOptions.put(oneVar, conf.getVar(oneVar));
     }
   }
 
   /**
-   * metastore related options that the db is initialized against
-   */
-  protected final static HiveConf.ConfVars [] metaVars = {
-    HiveConf.ConfVars.METASTOREDIRECTORY,
-    HiveConf.ConfVars.METASTOREWAREHOUSE,
-    HiveConf.ConfVars.METASTOREURIS
-  };
-
-  /**
    * cached values of such options
    */
   private final HashMap<HiveConf.ConfVars, String> dbOptions =
@@ -115,7 +106,7 @@
   public Hive getDb() throws HiveException {
     boolean needsRefresh = false;
 
-    for(HiveConf.ConfVars oneVar: metaVars) {
+    for(HiveConf.ConfVars oneVar: HiveConf.metaVars) {
       if(!StringUtils.isEmpty(StringUtils.difference(dbOptions.get(oneVar), conf.getVar(oneVar)))) {
         needsRefresh = true;
         break;
@@ -123,7 +114,7 @@
     }
     
     if((db == null) || needsRefresh) {
-      db = Hive.get(conf);
+      db = Hive.get(conf, needsRefresh);
     }
   
     return db;

Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java Tue Jan  6 14:03:29 2009
@@ -53,17 +53,14 @@
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde2.thrift.test.Complex;
 import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ThriftDeserializer;
+import org.apache.hadoop.hive.serde2.thrift.test.Complex;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import com.facebook.thrift.protocol.TBinaryProtocol;
-import org.apache.hadoop.hive.serde2.ThriftDeserializer;
-import java.nio.channels.FileChannel;
-
-import org.antlr.runtime.tree.*;
 
 public class QTestUtil {
 
@@ -706,13 +703,6 @@
         // assumption is that environment has already been cleaned once globally
         // hence each thread does not call cleanUp() and createSources() again
         qt.cliInit(fname, false);
-        /*
-          XXX Ugly hack - uncomment this to test without DDLs.
-          Should be removed once DDL/metastore mt issues are resolved
-          synchronized (this.getClass()) {
-          qt.executeOne(fname);
-          }
-        */
         qt.executeClient(fname);
       } catch (Throwable e) {
         System.err.println("Query file " + fname + " failed with exception " + e.getMessage());

Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java Tue Jan  6 14:03:29 2009
@@ -37,7 +37,7 @@
   private String resDir = System.getProperty("ql.test.results.clientpositive.dir");
 
   public void testMTQueries1()  throws Exception {
-    String[] testNames = new String [] {"join1.q", "join2.q", "groupby1.q", "groupby2.q"};
+    String[] testNames = new String [] {"join1.q", "join2.q", "groupby1.q", "groupby2.q", "join3.q", "input1.q", "input19.q"};
     String [] resDirs = new String [testNames.length];
     File [] qfiles = new File [testNames.length];
     for(int i=0; i<resDirs.length; i++) {

Modified: hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q?rev=732142&r1=732141&r2=732142&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q (original)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/input19.q Tue Jan  6 14:03:29 2009
@@ -1,3 +1,4 @@
+drop table apachelog;
 create table apachelog(ipaddress STRING,identd STRING,user STRING,finishtime STRING,requestline string,returncode INT,size INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe' WITH SERDEPROPERTIES (  'serialization.format'= 'org.apache.hadoop.hive.serde2.thrift.TCTLSeparatedProtocol',  'quote.delim'= '("|\\[|\\])',  'field.delim'=' ',  'serialization.null.format'='-'  ) STORED AS TEXTFILE;
 LOAD DATA LOCAL INPATH '../data/files/apache.access.log' INTO TABLE apachelog;
 SELECT a.* FROM apachelog a;