You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/09/05 18:08:02 UTC

svn commit: r1622731 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: Driver.java session/SessionState.java

Author: hashutosh
Date: Fri Sep  5 16:08:01 2014
New Revision: 1622731

URL: http://svn.apache.org/r1622731
Log:
HIVE-7889 : txnMgr should be session specific (Alan Gates via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1622731&r1=1622730&r2=1622731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Fri Sep  5 16:08:01 2014
@@ -135,7 +135,6 @@ public class Driver implements CommandPr
   private String errorMessage;
   private String SQLState;
   private Throwable downstreamError;
-  private HiveTxnManager txnMgr;
 
   // A limit on the number of threads that can be launched
   private int maxthreads;
@@ -145,16 +144,6 @@ public class Driver implements CommandPr
 
   private String userName;
 
-  private void createTxnManager() throws SemanticException {
-    if (txnMgr == null) {
-      try {
-        txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-      } catch (LockException e) {
-        throw new SemanticException(e.getMessage(), e);
-      }
-    }
-  }
-
   private boolean checkConcurrency() throws SemanticException {
     boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
     if (!supportConcurrency) {
@@ -868,7 +857,7 @@ public class Driver implements CommandPr
   // the input format.
   private int recordValidTxns() {
     try {
-      ValidTxnList txns = txnMgr.getValidTxns();
+      ValidTxnList txns = SessionState.get().getTxnMgr().getValidTxns();
       conf.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
       return 0;
     } catch (LockException e) {
@@ -893,7 +882,7 @@ public class Driver implements CommandPr
 
 
     try {
-      txnMgr.acquireLocks(plan, ctx, userName);
+      SessionState.get().getTxnMgr().acquireLocks(plan, ctx, userName);
       return 0;
     } catch (LockException e) {
       errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -917,7 +906,7 @@ public class Driver implements CommandPr
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
 
     if (hiveLocks != null) {
-      ctx.getHiveTxnManager().getLockManager().releaseLocks(hiveLocks);
+      SessionState.get().getTxnMgr().getLockManager().releaseLocks(hiveLocks);
     }
     ctx.setHiveLocks(null);
 
@@ -1048,9 +1037,14 @@ public class Driver implements CommandPr
 
     boolean requireLock = false;
     boolean ckLock = false;
+    SessionState ss = SessionState.get();
     try {
       ckLock = checkConcurrency();
-      createTxnManager();
+      try {
+        ss.initTxnMgr(conf);
+      } catch (LockException e) {
+        throw new SemanticException(e.getMessage(), e);
+      }
     } catch (SemanticException e) {
       errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
       SQLState = ErrorMsg.findSQLState(e.getMessage());
@@ -1074,7 +1068,7 @@ public class Driver implements CommandPr
     // the reason that we set the txn manager for the cxt here is because each
     // query has its own ctx object. The txn mgr is shared across the
     // same instance of Driver, which can run multiple queries.
-    ctx.setHiveTxnManager(txnMgr);
+    ctx.setHiveTxnManager(ss.getTxnMgr());
 
     if (ckLock) {
       boolean lockOnlyMapred = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);
@@ -1670,9 +1664,6 @@ public class Driver implements CommandPr
             e.getMessage());
       }
     }
-    if (txnMgr != null) {
-      txnMgr.closeTxnManager();
-    }
   }
 
   public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1622731&r1=1622730&r2=1622731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Fri Sep  5 16:08:01 2014
@@ -53,6 +53,9 @@ import org.apache.hadoop.hive.ql.exec.te
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.history.HiveHistoryImpl;
 import org.apache.hadoop.hive.ql.history.HiveHistoryProxyHandler;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -211,6 +214,29 @@ public class SessionState {
   private String hdfsScratchDirURIString;
 
   /**
+   * Transaction manager to use for this session.  This is instantiated lazily by
+   * {@link #initTxnMgr(org.apache.hadoop.hive.conf.HiveConf)}
+   */
+  private HiveTxnManager txnMgr = null;
+
+  /**
+   * When {@link #setCurrentTxn(long)} is set to this or {@link #getCurrentTxn()}} returns this it
+   * indicates that there is not a current transaction in this session.
+  */
+  public static final long NO_CURRENT_TXN = -1L;
+
+  /**
+   * Transaction currently open
+   */
+  private long currentTxn = NO_CURRENT_TXN;
+
+  /**
+   * Whether we are in auto-commit state or not.  Currently we are always in auto-commit,
+   * so there are not setters for this yet.
+   */
+  private boolean txnAutoCommit = true;
+
+  /**
    * Get the lineage state stored in this session.
    *
    * @return LineageState
@@ -312,6 +338,37 @@ public class SessionState {
   }
 
   /**
+   * Initialize the transaction manager.  This is done lazily to avoid hard wiring one
+   * transaction manager at the beginning of the session.  In general users shouldn't change
+   * this, but it's useful for testing.
+   * @param conf Hive configuration to initialize transaction manager
+   * @return transaction manager
+   * @throws LockException
+   */
+  public HiveTxnManager initTxnMgr(HiveConf conf) throws LockException {
+    if (txnMgr == null) {
+      txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    }
+    return txnMgr;
+  }
+
+  public HiveTxnManager getTxnMgr() {
+    return txnMgr;
+  }
+
+  public long getCurrentTxn() {
+    return currentTxn;
+  }
+
+  public void setCurrentTxn(long currTxn) {
+    currentTxn = currTxn;
+  }
+
+  public boolean isAutoCommit() {
+    return txnAutoCommit;
+  }
+
+  /**
    * Singleton Session object per thread.
    *
    **/
@@ -1100,6 +1157,7 @@ public class SessionState {
   }
 
   public void close() throws IOException {
+    if (txnMgr != null) txnMgr.closeTxnManager();
     JavaUtils.closeClassLoadersTo(conf.getClassLoader(), parentLoader);
     File resourceDir =
         new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));