You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/06/20 19:33:32 UTC
svn commit: r1604222 - in
/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn:
CompactionTxnHandler.java TxnHandler.java
Author: hashutosh
Date: Fri Jun 20 17:33:31 2014
New Revision: 1604222
URL: http://svn.apache.org/r1604222
Log:
HIVE-6967 : Hive transaction manager fails when SQLServer is used as an RDBMS (Alan Gates via Ashutosh Chauhan)
Modified:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1604222&r1=1604221&r2=1604222&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Fri Jun 20 17:33:31 2014
@@ -52,7 +52,7 @@ public class CompactionTxnHandler extend
* or runAs set since these are only potential compactions not actual ones.
*/
public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
Set<CompactionInfo> response = new HashSet<CompactionInfo>();
try {
Statement stmt = dbConn.createStatement();
@@ -105,7 +105,7 @@ public class CompactionTxnHandler extend
*/
public void setRunAs(long cq_id, String user) throws MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
Statement stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
@@ -143,13 +143,13 @@ public class CompactionTxnHandler extend
*/
public CompactionInfo findNextToCompact(String workerId) throws MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
CompactionInfo info = new CompactionInfo();
try {
Statement stmt = dbConn.createStatement();
String s = "select cq_id, cq_database, cq_table, cq_partition, " +
- "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "' for update";
+ "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
@@ -207,7 +207,7 @@ public class CompactionTxnHandler extend
*/
public void markCompacted(CompactionInfo info) throws MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
Statement stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
@@ -246,7 +246,7 @@ public class CompactionTxnHandler extend
* @return information on the entry in the queue.
*/
public List<CompactionInfo> findReadyToClean() throws MetaException {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
try {
@@ -293,7 +293,7 @@ public class CompactionTxnHandler extend
*/
public void markCleaned(CompactionInfo info) throws MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
Statement stmt = dbConn.createStatement();
String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
@@ -384,7 +384,7 @@ public class CompactionTxnHandler extend
*/
public void cleanEmptyAbortedTxns() throws MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
Statement stmt = dbConn.createStatement();
String s = "select txn_id from TXNS where " +
@@ -440,7 +440,7 @@ public class CompactionTxnHandler extend
*/
public void revokeFromLocalWorkers(String hostname) throws MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
Statement stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
@@ -484,7 +484,7 @@ public class CompactionTxnHandler extend
*/
public void revokeTimedoutWorkers(long timeout) throws MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
long latestValidStart = getDbTime(dbConn) - timeout;
try {
Statement stmt = dbConn.createStatement();
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1604222&r1=1604221&r2=1604222&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Fri Jun 20 17:33:31 2014
@@ -66,8 +66,8 @@ public class TxnHandler {
static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());
static private BoneCP connPool;
- private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock
- // method
+ private static final Boolean lockLock = new Boolean("true"); // Random object to lock on for the
+ // lock method
/**
* Number of consecutive deadlocks we have seen
@@ -119,13 +119,12 @@ public class TxnHandler {
// open transactions. To avoid needing a transaction on the underlying
// database we'll look at the current transaction number first. If it
// subsequently shows up in the open list that's ok.
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
try {
Statement stmt = dbConn.createStatement();
- LOG.debug("Going to execute query <select ntxn_next - 1 from " +
- "NEXT_TXN_ID>");
- ResultSet rs =
- stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+ String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction tables not properly " +
"initialized, no record found in next_txn_id");
@@ -137,8 +136,9 @@ public class TxnHandler {
}
List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
- LOG.debug("Going to execute query<select txn_id, txn_state from TXNS>");
- rs = stmt.executeQuery("select txn_id, txn_state, txn_user, txn_host from TXNS");
+ s = "select txn_id, txn_state, txn_user, txn_host from TXNS";
+ LOG.debug("Going to execute query<" + s + ">");
+ rs = stmt.executeQuery(s);
while (rs.next()) {
char c = rs.getString(2).charAt(0);
TxnState state;
@@ -179,14 +179,13 @@ public class TxnHandler {
// open transactions. To avoid needing a transaction on the underlying
// database we'll look at the current transaction number first. If it
// subsequently shows up in the open list that's ok.
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
try {
timeOutTxns(dbConn);
Statement stmt = dbConn.createStatement();
- LOG.debug("Going to execute query <select ntxn_next - 1 from " +
- "NEXT_TXN_ID>");
- ResultSet rs =
- stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+ String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction tables not properly " +
"initialized, no record found in next_txn_id");
@@ -198,8 +197,9 @@ public class TxnHandler {
}
Set<Long> openList = new HashSet<Long>();
- LOG.debug("Going to execute query<select txn_id from TXNS>");
- rs = stmt.executeQuery("select txn_id from TXNS");
+ s = "select txn_id from TXNS";
+ LOG.debug("Going to execute query<" + s + ">");
+ rs = stmt.executeQuery(s);
while (rs.next()) {
openList.add(rs.getLong(1));
}
@@ -234,7 +234,7 @@ public class TxnHandler {
public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
int numTxns = rqst.getNum_txns();
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
// Make sure the user has not requested an insane amount of txns.
int maxTxns = HiveConf.getIntVar(conf,
@@ -242,16 +242,15 @@ public class TxnHandler {
if (numTxns > maxTxns) numTxns = maxTxns;
Statement stmt = dbConn.createStatement();
- LOG.debug("Going to execute query <select ntxn_next from NEXT_TXN_ID " +
- " for update>");
- ResultSet rs =
- stmt.executeQuery("select ntxn_next from NEXT_TXN_ID for update");
+ String s = "select ntxn_next from NEXT_TXN_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction database not properly " +
"configured, can't find next transaction id.");
}
long first = rs.getLong(1);
- String s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
+ s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
long now = getDbTime(dbConn);
@@ -292,7 +291,7 @@ public class TxnHandler {
public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException {
long txnid = rqst.getTxnid();
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
List<Long> txnids = new ArrayList<Long>(1);
txnids.add(txnid);
@@ -327,7 +326,7 @@ public class TxnHandler {
throws NoSuchTxnException, TxnAbortedException, MetaException {
long txnid = rqst.getTxnid();
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
Statement stmt = dbConn.createStatement();
// Before we do the commit heartbeat the txn. This is slightly odd in that we're going to
@@ -382,7 +381,7 @@ public class TxnHandler {
public LockResponse lock(LockRequest rqst)
throws NoSuchTxnException, TxnAbortedException, MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
return lock(dbConn, rqst, true);
} catch (SQLException e) {
@@ -407,7 +406,7 @@ public class TxnHandler {
public LockResponse lockNoWait(LockRequest rqst)
throws NoSuchTxnException, TxnAbortedException, MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
return lock(dbConn, rqst, false);
} catch (SQLException e) {
@@ -432,7 +431,7 @@ public class TxnHandler {
public LockResponse checkLock(CheckLockRequest rqst)
throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
long extLockId = rqst.getLockid();
// Clean up timed out locks
@@ -468,7 +467,7 @@ public class TxnHandler {
public void unlock(UnlockRequest rqst)
throws NoSuchLockException, TxnOpenException, MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
// Odd as it seems, we need to heartbeat first because this touches the
// lock table and assures that our locks our still valid. If they are
@@ -519,7 +518,7 @@ public class TxnHandler {
}
public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
ShowLocksResponse rsp = new ShowLocksResponse();
List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
try {
@@ -571,7 +570,7 @@ public class TxnHandler {
public void heartbeat(HeartbeatRequest ids)
throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
heartbeatLock(dbConn, ids.getLockid());
heartbeatTxn(dbConn, ids.getTxnid());
@@ -597,7 +596,7 @@ public class TxnHandler {
public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
throws MetaException {
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse();
Set<Long> nosuch = new HashSet<Long>();
Set<Long> aborted = new HashSet<Long>();
@@ -634,12 +633,12 @@ public class TxnHandler {
public void compact(CompactionRequest rqst) throws MetaException {
// Put a compaction request in the queue.
try {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
try {
Statement stmt = dbConn.createStatement();
// Get the id for the next entry in the queue
- String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID for update";
+ String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID";
LOG.debug("going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
@@ -717,7 +716,7 @@ public class TxnHandler {
public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
ShowCompactResponse response = new ShowCompactResponse();
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
try {
Statement stmt = dbConn.createStatement();
String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
@@ -765,7 +764,7 @@ public class TxnHandler {
* For testing only, do not use.
*/
int numLocksInLockTable() throws SQLException, MetaException {
- Connection dbConn = getDbConn();
+ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
try {
Statement stmt = dbConn.createStatement();
String s = "select count(*) from HIVE_LOCKS";
@@ -794,11 +793,18 @@ public class TxnHandler {
}
- protected Connection getDbConn() throws MetaException {
+ /**
+ * Get a connection to the database
+ * @param isolationLevel desired isolation level. If you are doing _any_ data modifications
+ * you should request serializable, else read committed should be fine.
+ * @return db connection
+ * @throws MetaException if the connection cannot be obtained
+ */
+ protected Connection getDbConn(int isolationLevel) throws MetaException {
try {
Connection dbConn = connPool.getConnection();
dbConn.setAutoCommit(false);
- dbConn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+ dbConn.setTransactionIsolation(isolationLevel);
return dbConn;
} catch (SQLException e) {
String msg = "Unable to get jdbc connection from pool, " + e.getMessage();
@@ -999,7 +1005,7 @@ public class TxnHandler {
}
}
- private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING};
+ private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING}
// A jump table to figure out whether to wait, acquire,
// or keep looking . Since
@@ -1100,12 +1106,10 @@ public class TxnHandler {
try {
Statement stmt = dbConn.createStatement();
- // Get the next lock id. We have to do this as select for update so no
- // one else reads it and updates it under us.
- LOG.debug("Going to execute query <select nl_next from NEXT_LOCK_ID " +
- "for update>");
- ResultSet rs = stmt.executeQuery("select nl_next from NEXT_LOCK_ID " +
- "for update");
+ // Get the next lock id.
+ String s = "select nl_next from NEXT_LOCK_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
LOG.debug("Going to rollback");
dbConn.rollback();
@@ -1113,7 +1117,7 @@ public class TxnHandler {
"initialized, no record found in next_lock_id");
}
long extLockId = rs.getLong(1);
- String s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
+ s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
LOG.debug("Going to commit.");
@@ -1261,7 +1265,6 @@ public class TxnHandler {
query.append("))");
}
}
- query.append(" for update");
LOG.debug("Going to execute query <" + query.toString() + ">");
ResultSet rs = stmt.executeQuery(query.toString());
@@ -1360,7 +1363,7 @@ public class TxnHandler {
}
private void wait(Connection dbConn, Savepoint save) throws SQLException {
- // Need to rollback because we did a select for update but we didn't
+ // Need to rollback because we did a select that acquired locks but we didn't
// actually update anything. Also, we may have locked some locks as
// acquired that we now want to not acquire. It's ok to rollback because
// once we see one wait, we're done, we won't look for more.
@@ -1418,8 +1421,7 @@ public class TxnHandler {
Statement stmt = dbConn.createStatement();
long now = getDbTime(dbConn);
// We need to check whether this transaction is valid and open
- String s = "select txn_state from TXNS where txn_id = " +
- txnid + " for update";
+ String s = "select txn_state from TXNS where txn_id = " + txnid;
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {