You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/04/03 08:28:57 UTC
svn commit: r1584266 -
/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
Author: hashutosh
Date: Thu Apr 3 06:28:57 2014
New Revision: 1584266
URL: http://svn.apache.org/r1584266
Log:
HIVE-6788 : Abandoned opened transactions not being timed out (Alan Gates via Ashutosh Chauhan)
Modified:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1584266&r1=1584265&r2=1584266&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Thu Apr 3 06:28:57 2014
@@ -180,6 +180,7 @@ public class TxnHandler {
// subsequently shows up in the open list that's ok.
Connection dbConn = getDbConn();
try {
+ timeOutTxns(dbConn);
Statement stmt = dbConn.createStatement();
LOG.debug("Going to execute query <select ntxn_next - 1 from " +
"NEXT_TXN_ID>");
@@ -292,17 +293,9 @@ public class TxnHandler {
try {
Connection dbConn = getDbConn();
try {
- Statement stmt = dbConn.createStatement();
-
- // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
- String s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
-
- s = "update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- int updateCnt = stmt.executeUpdate(s);
- if (updateCnt != 1) {
+ List<Long> txnids = new ArrayList<Long>(1);
+ txnids.add(txnid);
+ if (abortTxns(dbConn, txnids) != 1) {
LOG.debug("Going to rollback");
dbConn.rollback();
throw new NoSuchTxnException("No such transaction: " + txnid);
@@ -951,6 +944,44 @@ public class TxnHandler {
}
/**
+ * Abort a group of txns
+ * @param dbConn An active connection
+ * @param txnids list of transactions to abort
+ * @return
+ * @throws SQLException
+ */
+ private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
+ Statement stmt = dbConn.createStatement();
+
+ // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
+ StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
+ boolean first = true;
+ for (Long id : txnids) {
+ if (first) first = false;
+ else buf.append(',');
+ buf.append(id);
+ }
+ buf.append(')');
+ LOG.debug("Going to execute update <" + buf.toString() + ">");
+ stmt.executeUpdate(buf.toString());
+
+ buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in (");
+ first = true;
+ for (Long id : txnids) {
+ if (first) first = false;
+ else buf.append(',');
+ buf.append(id);
+ }
+ buf.append(')');
+ LOG.debug("Going to execute update <" + buf.toString() + ">");
+ int updateCnt = stmt.executeUpdate(buf.toString());
+
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return updateCnt;
+ }
+
+ /**
* Request a lock
* @param dbConn database connection
* @param rqst lock information
@@ -1185,6 +1216,7 @@ public class TxnHandler {
// Look at everything in front of this lock to see if it should block
// it or not.
+ boolean acquired = false;
for (int i = index - 1; i >= 0; i--) {
// Check if we're operating on the same database, if not, move on
if (!locks[index].db.equals(locks[i].db)) {
@@ -1213,6 +1245,7 @@ public class TxnHandler {
(locks[i].state)) {
case ACQUIRE:
acquire(dbConn, stmt, extLockId, info.intLockId);
+ acquired = true;
break;
case WAIT:
wait(dbConn, save);
@@ -1227,11 +1260,13 @@ public class TxnHandler {
case KEEP_LOOKING:
continue;
}
+ if (acquired) break; // We've acquired this lock component,
+ // so get out of the loop and look at the next component.
}
- // If we've arrived here it means there's nothing in the way of the
- // lock, so acquire the lock.
- acquire(dbConn, stmt, extLockId, info.intLockId);
+ // If we've arrived here and we have not already acquired, it means there's nothing in the
+ // way of the lock, so acquire the lock.
+ if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId);
}
// We acquired all of the locks, so commit and return acquired.
@@ -1378,7 +1413,27 @@ public class TxnHandler {
return;
}
- private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
+ // Abort timed out transactions. This calls abortTxn(), which does a commit,
+ // and thus should be done before any calls to heartbeat that will leave
+ // open transactions on the underlying database.
+ private void timeOutTxns(Connection dbConn) throws SQLException {
+ long now = System.currentTimeMillis();
+ Statement stmt = dbConn.createStatement();
+ // Abort any timed out locks from the table.
+ String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN +
+ "' and txn_last_heartbeat < " + (now - timeout);
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ List<Long> deadTxns = new ArrayList<Long>();
+ // Limit the number of timed out transactions we do in one pass to keep from generating a
+ // huge delete statement
+ for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1));
+ // We don't care whether all of the transactions get deleted or not,
+ // if some didn't it most likely means someone else deleted them in the interum
+ if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns);
+ }
+
+ private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
if (connPool != null) return;
String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY);