You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/09/05 18:08:02 UTC
svn commit: r1622731 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
Driver.java session/SessionState.java
Author: hashutosh
Date: Fri Sep 5 16:08:01 2014
New Revision: 1622731
URL: http://svn.apache.org/r1622731
Log:
HIVE-7889 : txnMgr should be session specific (Alan Gates via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1622731&r1=1622730&r2=1622731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Fri Sep 5 16:08:01 2014
@@ -135,7 +135,6 @@ public class Driver implements CommandPr
private String errorMessage;
private String SQLState;
private Throwable downstreamError;
- private HiveTxnManager txnMgr;
// A limit on the number of threads that can be launched
private int maxthreads;
@@ -145,16 +144,6 @@ public class Driver implements CommandPr
private String userName;
- private void createTxnManager() throws SemanticException {
- if (txnMgr == null) {
- try {
- txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- } catch (LockException e) {
- throw new SemanticException(e.getMessage(), e);
- }
- }
- }
-
private boolean checkConcurrency() throws SemanticException {
boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
if (!supportConcurrency) {
@@ -868,7 +857,7 @@ public class Driver implements CommandPr
// the input format.
private int recordValidTxns() {
try {
- ValidTxnList txns = txnMgr.getValidTxns();
+ ValidTxnList txns = SessionState.get().getTxnMgr().getValidTxns();
conf.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
return 0;
} catch (LockException e) {
@@ -893,7 +882,7 @@ public class Driver implements CommandPr
try {
- txnMgr.acquireLocks(plan, ctx, userName);
+ SessionState.get().getTxnMgr().acquireLocks(plan, ctx, userName);
return 0;
} catch (LockException e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -917,7 +906,7 @@ public class Driver implements CommandPr
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
if (hiveLocks != null) {
- ctx.getHiveTxnManager().getLockManager().releaseLocks(hiveLocks);
+ SessionState.get().getTxnMgr().getLockManager().releaseLocks(hiveLocks);
}
ctx.setHiveLocks(null);
@@ -1048,9 +1037,14 @@ public class Driver implements CommandPr
boolean requireLock = false;
boolean ckLock = false;
+ SessionState ss = SessionState.get();
try {
ckLock = checkConcurrency();
- createTxnManager();
+ try {
+ ss.initTxnMgr(conf);
+ } catch (LockException e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
} catch (SemanticException e) {
errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
SQLState = ErrorMsg.findSQLState(e.getMessage());
@@ -1074,7 +1068,7 @@ public class Driver implements CommandPr
// the reason that we set the txn manager for the cxt here is because each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
- ctx.setHiveTxnManager(txnMgr);
+ ctx.setHiveTxnManager(ss.getTxnMgr());
if (ckLock) {
boolean lockOnlyMapred = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);
@@ -1670,9 +1664,6 @@ public class Driver implements CommandPr
e.getMessage());
}
}
- if (txnMgr != null) {
- txnMgr.closeTxnManager();
- }
}
public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1622731&r1=1622730&r2=1622731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Fri Sep 5 16:08:01 2014
@@ -53,6 +53,9 @@ import org.apache.hadoop.hive.ql.exec.te
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.history.HiveHistoryImpl;
import org.apache.hadoop.hive.ql.history.HiveHistoryProxyHandler;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -211,6 +214,29 @@ public class SessionState {
private String hdfsScratchDirURIString;
/**
+ * Transaction manager to use for this session. This is instantiated lazily by
+ * {@link #initTxnMgr(org.apache.hadoop.hive.conf.HiveConf)}
+ */
+ private HiveTxnManager txnMgr = null;
+
+ /**
+ * When {@link #setCurrentTxn(long)} is set to this or {@link #getCurrentTxn()}} returns this it
+ * indicates that there is not a current transaction in this session.
+ */
+ public static final long NO_CURRENT_TXN = -1L;
+
+ /**
+ * Transaction currently open
+ */
+ private long currentTxn = NO_CURRENT_TXN;
+
+ /**
+ * Whether we are in auto-commit state or not. Currently we are always in auto-commit,
+ * so there are not setters for this yet.
+ */
+ private boolean txnAutoCommit = true;
+
+ /**
* Get the lineage state stored in this session.
*
* @return LineageState
@@ -312,6 +338,37 @@ public class SessionState {
}
/**
+ * Initialize the transaction manager. This is done lazily to avoid hard wiring one
+ * transaction manager at the beginning of the session. In general users shouldn't change
+ * this, but it's useful for testing.
+ * @param conf Hive configuration to initialize transaction manager
+ * @return transaction manager
+ * @throws LockException
+ */
+ public HiveTxnManager initTxnMgr(HiveConf conf) throws LockException {
+ if (txnMgr == null) {
+ txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ }
+ return txnMgr;
+ }
+
+ public HiveTxnManager getTxnMgr() {
+ return txnMgr;
+ }
+
+ public long getCurrentTxn() {
+ return currentTxn;
+ }
+
+ public void setCurrentTxn(long currTxn) {
+ currentTxn = currTxn;
+ }
+
+ public boolean isAutoCommit() {
+ return txnAutoCommit;
+ }
+
+ /**
* Singleton Session object per thread.
*
**/
@@ -1100,6 +1157,7 @@ public class SessionState {
}
public void close() throws IOException {
+ if (txnMgr != null) txnMgr.closeTxnManager();
JavaUtils.closeClassLoadersTo(conf.getClassLoader(), parentLoader);
File resourceDir =
new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));