You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/04/27 07:45:38 UTC

[hive] branch master updated: HIVE-23283: Generate random temp ID for lock enqueue and commitTxn (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b8d474f  HIVE-23283: Generate random temp ID for lock enqueue and commitTxn (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)
b8d474f is described below

commit b8d474fbafe9ad8115f88b5ba646f686d2c7a92b
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Mon Apr 27 09:44:58 2020 +0200

    HIVE-23283: Generate random temp ID for lock enqueue and commitTxn (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)
---
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 4a6fa6f..5807868 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -45,6 +45,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
@@ -182,8 +183,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   private static final int ALLOWED_REPEATED_DEADLOCKS = 10;
   private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
-  private static final Long TEMP_HIVE_LOCK_ID = -1L;
-  private static final Long TEMP_COMMIT_ID = -1L;
 
   private static DataSource connPool;
   private static DataSource connPoolMutex;
@@ -1136,6 +1135,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         String conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN(" +
                 quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")";
 
+        long tempCommitId = generateTemporaryId();
         if (txnRecord.type != TxnType.READ_ONLY
                 && !rqst.isSetReplPolicy()
                 && isUpdateOrDelete(stmt, conflictSQLSuffix)) {
@@ -1151,12 +1151,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            * even if it includes all of its columns
            *
            * First insert into write_set using a temporary commitID, which will be updated in a separate call,
-           * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long)}}.
+           * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}.
            * This should decrease the scope of the S4U lock on the next_txn_id table.
            */
           Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
           stmt.executeUpdate("INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" +
-                          " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + TEMP_COMMIT_ID + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix);
+                          " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + tempCommitId + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix);
 
           /**
            * This S4U will mutex with other commitTxn() and openTxns().
@@ -1245,7 +1245,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           }
           deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
-        updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId);
+        updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId);
         if (rqst.isSetKeyValue()) {
           updateKeyValueAssociatedWithTxn(rqst, stmt);
         }
@@ -1330,12 +1330,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
-  private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId) throws SQLException {
+  private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId, long tempId) throws SQLException {
     List<String> queryBatch = new ArrayList<>(5);
     // update write_set with real commitId
     if (commitId != null) {
       queryBatch.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " + commitId +
-              " WHERE \"WS_COMMIT_ID\" = " + TEMP_COMMIT_ID + " AND \"WS_TXNID\" = " + txnid);
+              " WHERE \"WS_COMMIT_ID\" = " + tempId + " AND \"WS_TXNID\" = " + txnid);
     }
     // clean up txn related metadata
     if (txnType != TxnType.READ_ONLY) {
@@ -2394,7 +2394,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         /* Insert txn components and hive locks (with a temp extLockId) first, before getting the next lock ID in a select-for-update.
            This should minimize the scope of the S4U and decrease the table lock duration. */
         insertTxnComponents(txnid, rqst, dbConn);
-        insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst);
+        long tempExtLockId = insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst);
 
         /** Get the next lock id.
          * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race.
@@ -2403,7 +2403,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)}
          * doesn't block on locks acquired later than one it's checking*/
         long extLockId = getNextLockIdForUpdate(dbConn, stmt);
-        incrementLockIdAndUpdateHiveLocks(stmt, extLockId);
+        incrementLockIdAndUpdateHiveLocks(stmt, extLockId, tempExtLockId);
 
         dbConn.commit();
         success = true;
@@ -2443,10 +2443,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
-  private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId) throws SQLException {
+  private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId, long tempId) throws SQLException {
     String incrCmd = String.format(INCREMENT_NEXT_LOCK_ID_QUERY, (extLockId + 1));
     // update hive locks entries with the real EXT_LOCK_ID (replace temp ID)
-    String updateLocksCmd = String.format(UPDATE_HIVE_LOCKS_EXT_ID_QUERY, extLockId, TEMP_HIVE_LOCK_ID);
+    String updateLocksCmd = String.format(UPDATE_HIVE_LOCKS_EXT_ID_QUERY, extLockId, tempId);
     LOG.debug("Going to execute updates in batch: <" + incrCmd + ">, and <" + updateLocksCmd + ">");
     stmt.addBatch(incrCmd);
     stmt.addBatch(updateLocksCmd);
@@ -2567,11 +2567,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
-  private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException {
+  private long insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException {
 
     String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct);
     String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB);
     long intLockId = 0;
+    long tempExtLockId = generateTemporaryId();
 
     try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) {
       for (LockComponent lc : rqst.getComponent()) {
@@ -2589,7 +2590,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         intLockId++;
         String lockType = LockTypeUtil.getEncodingAsStr(lc.getType());
 
-        pstmt.setLong(1, TEMP_HIVE_LOCK_ID);
+        pstmt.setLong(1, tempExtLockId);
         pstmt.setLong(2, intLockId);
         pstmt.setLong(3, txnid);
         pstmt.setString(4, normalizeCase(lc.getDbname()));
@@ -2612,6 +2613,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         pstmt.executeBatch();
       }
     }
+    return tempExtLockId;
+  }
+
+  private long generateTemporaryId() {
+    return -1 * ThreadLocalRandom.current().nextLong();
   }
 
   private static String normalizeCase(String s) {