You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/04/03 08:28:57 UTC

svn commit: r1584266 - /hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java

Author: hashutosh
Date: Thu Apr  3 06:28:57 2014
New Revision: 1584266

URL: http://svn.apache.org/r1584266
Log:
HIVE-6788 : Abandoned opened transactions not being timed out (Alan Gates via Ashutosh Chauhan)

Modified:
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1584266&r1=1584265&r2=1584266&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Thu Apr  3 06:28:57 2014
@@ -180,6 +180,7 @@ public class TxnHandler {
     // subsequently shows up in the open list that's ok.
     Connection dbConn = getDbConn();
     try {
+      timeOutTxns(dbConn);
       Statement stmt = dbConn.createStatement();
       LOG.debug("Going to execute query <select ntxn_next - 1 from " +
           "NEXT_TXN_ID>");
@@ -292,17 +293,9 @@ public class TxnHandler {
     try {
       Connection dbConn = getDbConn();
       try {
-        Statement stmt = dbConn.createStatement();
-
-        // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
-        String s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-
-        s = "update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        int updateCnt = stmt.executeUpdate(s);
-        if (updateCnt != 1) {
+        List<Long> txnids = new ArrayList<Long>(1);
+        txnids.add(txnid);
+        if (abortTxns(dbConn, txnids) != 1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
           throw new NoSuchTxnException("No such transaction: " + txnid);
@@ -951,6 +944,44 @@ public class TxnHandler {
   }
 
   /**
+   * Abort a group of txns
+   * @param dbConn An active connection
+   * @param txnids list of transactions to abort
+   * @return
+   * @throws SQLException
+   */
+  private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
+    Statement stmt = dbConn.createStatement();
+
+    // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
+    StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
+    boolean first = true;
+    for (Long id : txnids) {
+      if (first) first = false;
+      else buf.append(',');
+      buf.append(id);
+    }
+    buf.append(')');
+    LOG.debug("Going to execute update <" + buf.toString() + ">");
+    stmt.executeUpdate(buf.toString());
+
+    buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in (");
+    first = true;
+    for (Long id : txnids) {
+      if (first) first = false;
+      else buf.append(',');
+      buf.append(id);
+    }
+    buf.append(')');
+    LOG.debug("Going to execute update <" + buf.toString() + ">");
+    int updateCnt = stmt.executeUpdate(buf.toString());
+
+    LOG.debug("Going to commit");
+    dbConn.commit();
+    return updateCnt;
+  }
+
+  /**
    * Request a lock
    * @param dbConn database connection
    * @param rqst lock information
@@ -1185,6 +1216,7 @@ public class TxnHandler {
 
       // Look at everything in front of this lock to see if it should block
       // it or not.
+      boolean acquired = false;
       for (int i = index - 1; i >= 0; i--) {
         // Check if we're operating on the same database, if not, move on
         if (!locks[index].db.equals(locks[i].db)) {
@@ -1213,6 +1245,7 @@ public class TxnHandler {
             (locks[i].state)) {
           case ACQUIRE:
             acquire(dbConn, stmt, extLockId, info.intLockId);
+            acquired = true;
             break;
           case WAIT:
             wait(dbConn, save);
@@ -1227,11 +1260,13 @@ public class TxnHandler {
           case KEEP_LOOKING:
             continue;
         }
+        if (acquired) break; // We've acquired this lock component,
+        // so get out of the loop and look at the next component.
       }
 
-      // If we've arrived here it means there's nothing in the way of the
-      // lock, so acquire the lock.
-      acquire(dbConn, stmt, extLockId, info.intLockId);
+      // If we've arrived here and we have not already acquired, it means there's nothing in the
+      // way of the lock, so acquire the lock.
+      if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId);
     }
 
     // We acquired all of the locks, so commit and return acquired.
@@ -1378,7 +1413,27 @@ public class TxnHandler {
     return;
   }
 
- private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
+  // Abort timed out transactions.  This calls abortTxn(), which does a commit,
+  // and thus should be done before any calls to heartbeat that will leave
+  // open transactions on the underlying database.
+  private void timeOutTxns(Connection dbConn) throws SQLException {
+    long now = System.currentTimeMillis();
+    Statement stmt = dbConn.createStatement();
+    // Abort any timed out locks from the table.
+    String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN +
+        "' and txn_last_heartbeat <  " + (now - timeout);
+    LOG.debug("Going to execute query <" + s + ">");
+    ResultSet rs = stmt.executeQuery(s);
+    List<Long> deadTxns = new ArrayList<Long>();
+    // Limit the number of timed out transactions we do in one pass to keep from generating a
+    // huge delete statement
+    for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1));
+    // We don't care whether all of the transactions get deleted or not,
+    // if some didn't it most likely means someone else deleted them in the interum
+    if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns);
+  }
+
+  private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
     if (connPool != null) return;
 
     String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY);