You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/25 19:54:41 UTC

[2/2] hive git commit: HIVE-17340 TxnHandler.checkLock() - reduce number of SQL statements (Eugene Koifman, reviewed by Wei Zheng)

HIVE-17340 TxnHandler.checkLock() - reduce number of SQL statements (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/75671197
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/75671197
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/75671197

Branch: refs/heads/master
Commit: 75671197cfca8d013bc7b6a6346d6318b10ca83c
Parents: 1938eeb
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Aug 25 12:49:17 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Aug 25 12:49:17 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 81 +++++++++++---------
 1 file changed, 46 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/75671197/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 71e7c0c..5bfc029 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -2444,10 +2444,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    *
    * This is expected to run at READ_COMMITTED.
    *
-   * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take
-   * all locks for given extLockId or none.  Would be more efficient to update state on all locks
-   * at once.  Semantics are the same since this is all part of the same txn.
-   *
    * If there is a concurrent commitTxn/rollbackTxn, those can only remove rows from HIVE_LOCKS.
    * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid,
    * checkLock() will in the worst case keep locks in Waiting state a little longer.
@@ -2654,7 +2650,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         // Look at everything in front of this lock to see if it should block
         // it or not.
-        boolean acquired = false;
         for (int i = index - 1; i >= 0; i--) {
           // Check if we're operating on the same database, if not, move on
           if (!locks[index].db.equals(locks[i].db)) {
@@ -2706,20 +2701,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               }
               //fall through to ACQUIRE
             case ACQUIRE:
-              acquire(dbConn, stmt, extLockId, info);
-              acquired = true;
               break;
             case KEEP_LOOKING:
               continue;
           }
-          if (acquired) break; // We've acquired this lock component,
-          // so get out of the loop and look at the next component.
+          //if we got here, it means it's ok to acquire 'info' lock
+          break;// so exit the loop and check next lock
         }
-
-        // If we've arrived here and we have not already acquired, it means there's nothing in the
-        // way of the lock, so acquire the lock.
-        if (!acquired) acquire(dbConn, stmt, extLockId, info);
       }
+      //if here, ther were no locks that blocked any locks in 'locksBeingChecked' - acquire them all
+      acquire(dbConn, stmt, locksBeingChecked);
 
       // We acquired all of the locks, so commit and return acquired.
       LOG.debug("Going to commit");
@@ -2733,6 +2724,48 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
     return response;
   }
+  private void acquire(Connection dbConn, Statement stmt, List<LockInfo> locksBeingChecked)
+    throws SQLException, NoSuchLockException, MetaException {
+    if(locksBeingChecked == null || locksBeingChecked.isEmpty()) {
+      return;
+    }
+    long txnId = locksBeingChecked.get(0).txnId;
+    long extLockId = locksBeingChecked.get(0).extLockId;
+    long now = getDbTime(dbConn);
+    String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
+      //if lock is part of txn, heartbeat info is in txn record
+      "hl_last_heartbeat = " + (isValidTxn(txnId) ? 0 : now) +
+      ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" +
+      " where hl_lock_ext_id = " +  extLockId;
+    LOG.debug("Going to execute update <" + s + ">");
+    int rc = stmt.executeUpdate(s);
+    if (rc < locksBeingChecked.size()) {
+      LOG.debug("Going to rollback acquire(Connection dbConn, Statement stmt, List<LockInfo> locksBeingChecked)");
+      dbConn.rollback();
+      /*select all locks for this ext ID and see which ones are missing*/
+      StringBuilder sb = new StringBuilder("No such lock(s): (" + JavaUtils.lockIdToString(extLockId) + ":");
+      ResultSet rs = stmt.executeQuery("select hl_lock_int_id from HIVE_LOCKS where hl_lock_ext_id = " + extLockId);
+      while(rs.next()) {
+        int intLockId = rs.getInt(1);
+        int idx = 0;
+        for(; idx < locksBeingChecked.size(); idx++) {
+          LockInfo expectedLock = locksBeingChecked.get(idx);
+          if(expectedLock != null && expectedLock.intLockId == intLockId) {
+            locksBeingChecked.set(idx, null);
+            break;
+          }
+        }
+      }
+      for(LockInfo expectedLock : locksBeingChecked) {
+        if(expectedLock != null) {
+          sb.append(expectedLock.intLockId).append(",");
+        }
+      }
+      sb.append(") ").append(JavaUtils.txnIdToString(txnId));
+      close(rs);
+      throw new NoSuchLockException(sb.toString());
+    }
+  }
 
   /**
    * the {@link #jumpTable} only deals with LockState/LockType.  In some cases it's not
@@ -2775,28 +2808,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     LOG.debug("Going to rollback to savepoint");
     dbConn.rollback(save);
   }
-
-  private void acquire(Connection dbConn, Statement stmt, long extLockId, LockInfo lockInfo)
-    throws SQLException, NoSuchLockException, MetaException {
-    long now = getDbTime(dbConn);
-    String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
-      //if lock is part of txn, heartbeat info is in txn record
-      "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) +
-    ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + " where hl_lock_ext_id = " +
-      extLockId + " and hl_lock_int_id = " + lockInfo.intLockId;
-    LOG.debug("Going to execute update <" + s + ">");
-    int rc = stmt.executeUpdate(s);
-    if (rc < 1) {
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-      throw new NoSuchLockException("No such lock: (" + JavaUtils.lockIdToString(extLockId) + "," +
-        + lockInfo.intLockId + ") " + JavaUtils.txnIdToString(lockInfo.txnId));
-    }
-    // We update the database, but we don't commit because there may be other
-    // locks together with this, and we only want to acquire one if we can
-    // acquire all.
-  }
-
   /**
    * Heartbeats on the lock table.  This commits, so do not enter it with any state.
    * Should not be called on a lock that belongs to transaction.