You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/04/02 11:39:57 UTC

[hive] branch master updated: HIVE-23052: Optimize lock enqueueing in TxnHandler (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8739c58  HIVE-23052: Optimize lock enqueueing in TxnHandler (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)
8739c58 is described below

commit 8739c5838a849cd487ebfe474250d847bb5ac208
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Thu Apr 2 13:37:02 2020 +0200

    HIVE-23052: Optimize lock enqueueing in TxnHandler (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)
---
 .../datasource/BoneCPDataSourceProvider.java       |   1 +
 .../datasource/DbCPDataSourceProvider.java         |   1 +
 .../datasource/HikariCPDataSourceProvider.java     |   1 +
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 436 ++++++++++++---------
 4 files changed, 244 insertions(+), 195 deletions(-)

diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
index f92ce73..de202c9 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
@@ -89,6 +89,7 @@ public class BoneCPDataSourceProvider implements DataSourceProvider {
     DatabaseProduct dbProduct =  determineDatabaseProduct(driverUrl);
     switch (dbProduct){
       case MYSQL:
+        connProperties.put("allowMultiQueries", true);
         connProperties.put("rewriteBatchedStatements", true);
         break;
       case POSTGRES:
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
index 85719fd..12cacd2 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
@@ -71,6 +71,7 @@ public class DbCPDataSourceProvider implements DataSourceProvider {
     DatabaseProduct dbProduct =  determineDatabaseProduct(driverUrl);
     switch (dbProduct){
       case MYSQL:
+        dbcpDs.setConnectionProperties("allowMultiQueries=true");
         dbcpDs.setConnectionProperties("rewriteBatchedStatements=true");
         break;
       case POSTGRES:
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
index 76bbf3b..3e871e9 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
@@ -76,6 +76,7 @@ public class HikariCPDataSourceProvider implements DataSourceProvider {
     switch (dbProduct){
       case MYSQL:
         config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES");
+        config.addDataSourceProperty("allowMultiQueries", true);
         config.addDataSourceProperty("rewriteBatchedStatements", true);
         break;
       case POSTGRES:
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 74ef885..e8a988c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +53,7 @@ import javax.sql.DataSource;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -179,6 +181,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   private static final int ALLOWED_REPEATED_DEADLOCKS = 10;
   private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
+  private static final Long TEMP_HIVE_LOCK_ID = -1L;
 
   private static DataSource connPool;
   private static DataSource connPoolMutex;
@@ -190,6 +193,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " +
       "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)";
 
+  private static final String TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"TXN_COMPONENTS\" (" +
+          "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" +
+          " VALUES (?, ?, ?, ?, ?, ?)";
+  private static final String INCREMENT_NEXT_LOCK_ID_QUERY = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = %s";
+  private static final String UPDATE_HIVE_LOCKS_EXT_ID_QUERY = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = %s WHERE \"HL_LOCK_EXT_ID\" = %s";
+  private static final String SELECT_WRITE_ID_QUERY = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE"
+          + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = ?";
+
   private List<TransactionalMetaStoreEventListener> transactionalListeners;
 
   /**
@@ -2394,9 +2405,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     Connection dbConn = null;
     try {
       Statement stmt = null;
-      PreparedStatement pStmt = null;
-      List<PreparedStatement> insertPreparedStmts = null;
-      ResultSet rs = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -2410,175 +2418,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             shouldNeverHappen(txnid);
           }
         }
+        /* Insert txn components and hive locks (with a temp extLockId) first, before getting the next lock ID in a select-for-update.
+           This should minimize the scope of the S4U and decrease the table lock duration. */
+        insertTxnComponents(txnid, rqst, dbConn);
+        insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst);
+
         /** Get the next lock id.
          * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race.
          * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running.  1st one generates nl_next=7,
          * 2nd nl_next=8.  Then 8 goes first to insert into HIVE_LOCKS and acquires the locks.  Then 7 unblocks,
          * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)}
          * doesn't block on locks acquired later than one it's checking*/
-        String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\"");
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_lock_id");
-        }
-        long extLockId = rs.getLong(1);
-        s = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = " + (extLockId + 1);
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-
-        if (txnid > 0) {
-          List<String> rows = new ArrayList<>();
-          List<List<String>> paramsList = new ArrayList<>();
-          // For each component in this lock request,
-          // add an entry to the txn_components table
-          for (LockComponent lc : rqst.getComponent()) {
-            if(lc.isSetIsTransactional() && !lc.isIsTransactional()) {
-              //we don't prevent using non-acid resources in a txn but we do lock them
-              continue;
-            }
-            boolean updateTxnComponents;
-            if(!lc.isSetOperationType()) {
-              //request came from old version of the client
-              updateTxnComponents = true;//this matches old behavior
-            }
-            else {
-              switch (lc.getOperationType()) {
-                case INSERT:
-                case UPDATE:
-                case DELETE:
-                  if(!lc.isSetIsDynamicPartitionWrite()) {
-                    //must be old client talking, i.e. we don't know if it's DP so be conservative
-                    updateTxnComponents = true;
-                  }
-                  else {
-                    /**
-                     * we know this is part of DP operation and so we'll get
-                     * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
-                     * of partitions actually chaged.
-                     */
-                    updateTxnComponents = !lc.isIsDynamicPartitionWrite();
-                  }
-                  break;
-                case SELECT:
-                  updateTxnComponents = false;
-                  break;
-                case NO_TXN:
-                  /*this constant is a bit of a misnomer since we now always have a txn context.  It
-                   just means the operation is such that we don't care what tables/partitions it
-                   affected as it doesn't trigger a compaction or conflict detection.  A better name
-                   would be NON_TRANSACTIONAL.*/
-                  updateTxnComponents = false;
-                  break;
-                default:
-                  //since we have an open transaction, only 4 values above are expected
-                  throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
-                    + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid));
-              }
-            }
-            if(!updateTxnComponents) {
-              continue;
-            }
-            String dbName = normalizeCase(lc.getDbname());
-            String tblName = normalizeCase(lc.getTablename());
-            String partName = normalizeCase(lc.getPartitionname());
-            Long writeId = null;
-            if (tblName != null) {
-              // It is assumed the caller have already allocated write id for adding/updating data to
-              // the acid tables. However, DDL operatons won't allocate write id and hence this query
-              // may return empty result sets.
-              // Get the write id allocated by this txn for the given table writes
-              s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE"
-                      + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid;
-              pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName));
-              LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
-                      quoteString(dbName), quoteString(tblName));
-              rs = pStmt.executeQuery();
-              if (rs.next()) {
-                writeId = rs.getLong(1);
-              }
-            }
-            rows.add(txnid + ", ?, " +
-                    (tblName == null ? "null" : "?") + ", " +
-                    (partName == null ? "null" : "?")+ "," +
-                    quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," +
-                    (writeId == null ? "null" : writeId));
-            List<String> params = new ArrayList<>();
-            params.add(dbName);
-            if (tblName != null) {
-              params.add(tblName);
-            }
-            if (partName != null) {
-              params.add(partName);
-            }
-            paramsList.add(params);
-          }
-          insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
-              "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")",
-                  rows, paramsList);
-          for(PreparedStatement pst : insertPreparedStmts) {
-            int modCount = pst.executeUpdate();
-            closeStmt(pst);
-          }
-          insertPreparedStmts = null;
-        }
-        String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct);
-        String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB);
-
-        int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
-        long intLockId = 0;
-
-        try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) {
-          for (LockComponent lc : rqst.getComponent()) {
-            if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
-              (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) {
-              //Old version of thrift client should have (lc.isSetOperationType() == false), but they do not
-              //If you add a default value to a variable, isSet() for that variable is true regardless of the where the
-              //message was created (for object variables).
-              //It works correctly for boolean vars, e.g. LockComponent.isTransactional).
-              //in test mode, upgrades are not tested, so client version and server version of thrift always matches so
-              //we see UNSET here it means something didn't set the appropriate value.
-              throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component "
-                + lc + " agentInfo=" + rqst.getAgentInfo());
-            }
-            intLockId++;
-
-            char lockChar = 'z';
-            switch (lc.getType()) {
-              case EXCLUSIVE:
-                lockChar = LOCK_EXCLUSIVE;
-                break;
-              case SHARED_READ:
-                lockChar = LOCK_SHARED;
-                break;
-              case SHARED_WRITE:
-                lockChar = LOCK_SEMI_SHARED;
-                break;
-            }
-            pstmt.setLong(1, extLockId);
-            pstmt.setLong(2, intLockId);
-            pstmt.setLong(3, txnid);
-            pstmt.setString(4, normalizeCase(lc.getDbname()));
-            pstmt.setString(5, normalizeCase(lc.getTablename()));
-            pstmt.setString(6, normalizeCase(lc.getPartitionname()));
-            pstmt.setString(7, Character.toString(LOCK_WAITING));
-            pstmt.setString(8, Character.toString(lockChar));
-            pstmt.setString(9, rqst.getUser());
-            pstmt.setString(10, rqst.getHostname());
-            pstmt.setString(11, rqst.getAgentInfo());
+        long extLockId = getNextLockIdForUpdate(dbConn, stmt);
+        incrementLockIdAndUpdateHiveLocks(stmt, extLockId);
 
-            pstmt.addBatch();
-            if (intLockId % batchSize == 0) {
-              pstmt.executeBatch();
-            }
-          }
-          if (intLockId % batchSize != 0) {
-            pstmt.executeBatch();
-          }
-        }
         dbConn.commit();
         success = true;
         return new ConnectionLockIdPair(dbConn, extLockId);
@@ -2589,13 +2442,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        if (insertPreparedStmts != null) {
-          for (PreparedStatement pst : insertPreparedStmts) {
-            closeStmt(pst);
-          }
-        }
-        closeStmt(pStmt);
-        close(rs, stmt, null);
+        closeStmt(stmt);
         if (!success) {
           /* This needs to return a "live" connection to be used by operation that follows it.
           Thus it only closes Connection on failure/retry. */
@@ -2608,6 +2455,206 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       return enqueueLockWithRetry(rqst);
     }
   }
+
+  private long getNextLockIdForUpdate(Connection dbConn, Statement stmt) throws SQLException, MetaException {
+    String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\"");
+    LOG.debug("Going to execute query <" + s + ">");
+    try (ResultSet rs = stmt.executeQuery(s)) {
+      if (!rs.next()) {
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+        throw new MetaException("Transaction tables not properly " +
+                "initialized, no record found in next_lock_id");
+      }
+      return rs.getLong(1);
+    }
+  }
+
+  private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId) throws SQLException {
+    String incrCmd = String.format(INCREMENT_NEXT_LOCK_ID_QUERY, (extLockId + 1));
+    // update hive locks entries with the real EXT_LOCK_ID (replace temp ID)
+    String updateLocksCmd = String.format(UPDATE_HIVE_LOCKS_EXT_ID_QUERY, extLockId, TEMP_HIVE_LOCK_ID);
+    LOG.debug("Going to execute updates in batch: <" + incrCmd + ">, and <" + updateLocksCmd + ">");
+    stmt.addBatch(incrCmd);
+    stmt.addBatch(updateLocksCmd);
+    stmt.executeBatch();
+  }
+
+  private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn) throws SQLException {
+    if (txnid > 0) {
+      Map<Pair<String, String>, Optional<Long>> writeIdCache = new HashMap<>();
+      try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) {
+        // For each component in this lock request,
+        // add an entry to the txn_components table
+        int insertCounter = 0;
+        int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+        for (LockComponent lc : rqst.getComponent()) {
+          if (lc.isSetIsTransactional() && !lc.isIsTransactional()) {
+            //we don't prevent using non-acid resources in a txn but we do lock them
+            continue;
+          }
+          if (!shouldUpdateTxnComponent(txnid, rqst, lc)) {
+            continue;
+          }
+          String dbName = normalizeCase(lc.getDbname());
+          String tblName = normalizeCase(lc.getTablename());
+          String partName = normalizeCase(lc.getPartitionname());
+          Optional<Long> writeId = getWriteId(writeIdCache, dbName, tblName, txnid, dbConn);
+
+          pstmt.setLong(1, txnid);
+          pstmt.setString(2, dbName);
+          pstmt.setString(3, tblName);
+          pstmt.setString(4, partName);
+          pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).toString());
+          pstmt.setObject(6, writeId.orElse(null));
+
+          pstmt.addBatch();
+          insertCounter++;
+          if (insertCounter % batchSize == 0) {
+            LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize);
+            pstmt.executeBatch();
+          }
+        }
+        if (insertCounter % batchSize != 0) {
+          LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize);
+          pstmt.executeBatch();
+        }
+      }
+    }
+  }
+
+  private Optional<Long> getWriteId(Map<Pair<String, String>, Optional<Long>> writeIdCache, String dbName, String tblName, long txnid, Connection dbConn) throws SQLException {
+    /* we can cache writeIDs based on dbName and tblName because txnid is invariant and
+    partitionName is not part of the writeID select query */
+    Pair<String, String> dbAndTable = Pair.of(dbName, tblName);
+    if (writeIdCache.containsKey(dbAndTable)) {
+      return writeIdCache.get(dbAndTable);
+    } else {
+      Optional<Long> writeId = getWriteIdFromDb(txnid, dbConn, dbName, tblName);
+      writeIdCache.put(dbAndTable, writeId);
+      return writeId;
+    }
+  }
+
+  private Optional<Long> getWriteIdFromDb(long txnid, Connection dbConn, String dbName, String tblName) throws SQLException {
+    if (tblName != null) {
+      // It is assumed the caller have already allocated write id for adding/updating data to
+      // the acid tables. However, DDL operatons won't allocate write id and hence this query
+      // may return empty result sets.
+      // Get the write id allocated by this txn for the given table writes
+      try (PreparedStatement pstmt = dbConn.prepareStatement(SELECT_WRITE_ID_QUERY)) {
+        pstmt.setString(1, dbName);
+        pstmt.setString(2, tblName);
+        pstmt.setLong(3, txnid);
+        LOG.debug("Going to execute query <" + SELECT_WRITE_ID_QUERY + ">");
+        try (ResultSet rs = pstmt.executeQuery()) {
+          if (rs.next()) {
+            return Optional.of(rs.getLong(1));
+          }
+        }
+      }
+    }
+    return Optional.empty();
+  }
+
+  private boolean shouldUpdateTxnComponent(long txnid, LockRequest rqst, LockComponent lc) {
+    if(!lc.isSetOperationType()) {
+      //request came from old version of the client
+      return true; //this matches old behavior
+    }
+    else {
+      switch (lc.getOperationType()) {
+        case INSERT:
+        case UPDATE:
+        case DELETE:
+          if(!lc.isSetIsDynamicPartitionWrite()) {
+            //must be old client talking, i.e. we don't know if it's DP so be conservative
+            return true;
+          }
+          else {
+            /**
+             * we know this is part of DP operation and so we'll get
+             * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
+             * of partitions actually chaged.
+             */
+            return !lc.isIsDynamicPartitionWrite();
+          }
+        case SELECT:
+          return false;
+        case NO_TXN:
+              /*this constant is a bit of a misnomer since we now always have a txn context.  It
+               just means the operation is such that we don't care what tables/partitions it
+               affected as it doesn't trigger a compaction or conflict detection.  A better name
+               would be NON_TRANSACTIONAL.*/
+          return false;
+        default:
+          //since we have an open transaction, only 4 values above are expected
+          throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
+                  + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid));
+      }
+    }
+  }
+
+  private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException {
+
+    String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct);
+    String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB);
+    int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+    long intLockId = 0;
+
+    try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) {
+      for (LockComponent lc : rqst.getComponent()) {
+        if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
+                (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) {
+          //Old version of thrift client should have (lc.isSetOperationType() == false), but they do not
+          //If you add a default value to a variable, isSet() for that variable is true regardless of the where the
+          //message was created (for object variables).
+          //It works correctly for boolean vars, e.g. LockComponent.isTransactional).
+          //in test mode, upgrades are not tested, so client version and server version of thrift always matches so
+          //we see UNSET here it means something didn't set the appropriate value.
+          throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component "
+                  + lc + " agentInfo=" + rqst.getAgentInfo());
+        }
+        intLockId++;
+
+        pstmt.setLong(1, TEMP_HIVE_LOCK_ID);
+        pstmt.setLong(2, intLockId);
+        pstmt.setLong(3, txnid);
+        pstmt.setString(4, normalizeCase(lc.getDbname()));
+        pstmt.setString(5, normalizeCase(lc.getTablename()));
+        pstmt.setString(6, normalizeCase(lc.getPartitionname()));
+        pstmt.setString(7, Character.toString(LOCK_WAITING));
+        pstmt.setString(8, Character.toString(getLockChar(lc.getType())));
+        pstmt.setString(9, rqst.getUser());
+        pstmt.setString(10, rqst.getHostname());
+        pstmt.setString(11, rqst.getAgentInfo());
+
+        pstmt.addBatch();
+        if (intLockId % batchSize == 0) {
+          LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + batchSize);
+          pstmt.executeBatch();
+        }
+      }
+      if (intLockId % batchSize != 0) {
+        LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % batchSize);
+        pstmt.executeBatch();
+      }
+    }
+  }
+
+  private char getLockChar(LockType lockType) {
+    switch (lockType) {
+      case EXCLUSIVE:
+        return LOCK_EXCLUSIVE;
+      case SHARED_READ:
+        return LOCK_SHARED;
+      case SHARED_WRITE:
+        return LOCK_SEMI_SHARED;
+      default:
+        return 'z';
+    }
+  }
+
   private static String normalizeCase(String s) {
     return s == null ? null : s.toLowerCase();
   }
@@ -3271,7 +3318,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       throws NoSuchTxnException,  TxnAbortedException, MetaException {
     Connection dbConn = null;
     Statement stmt = null;
-    List<PreparedStatement> insertPreparedStmts = null;
     try {
       try {
         lockInternal();
@@ -3290,23 +3336,28 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
 
         Long writeId = rqst.getWriteid();
-        List<String> rows = new ArrayList<>();
-        List<List<String>> paramsList = new ArrayList<>();
-        for (String partName : rqst.getPartitionnames()) {
-          rows.add(rqst.getTxnid() + ",?,?,?," + quoteChar(ot.sqlConst) + "," + writeId);
-          List<String> params = new ArrayList<>();
-          params.add(normalizeCase(rqst.getDbname()));
-          params.add(normalizeCase(rqst.getTablename()));
-          params.add(partName);
-          paramsList.add(params);
-        }
-        int modCount = 0;
-        //record partitions that were written to
-        insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
-            "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")",
-                rows, paramsList);
-        for(PreparedStatement pst : insertPreparedStmts) {
-          modCount = pst.executeUpdate();
+        try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) {
+          int insertCounter = 0;
+          int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+          for (String partName : rqst.getPartitionnames()) {
+            pstmt.setLong(1, rqst.getTxnid());
+            pstmt.setString(2, normalizeCase(rqst.getDbname()));
+            pstmt.setString(3, normalizeCase(rqst.getTablename()));
+            pstmt.setString(4, partName);
+            pstmt.setString(5, Character.toString(ot.sqlConst));
+            pstmt.setObject(6, writeId);
+
+            pstmt.addBatch();
+            insertCounter++;
+            if (insertCounter % batchSize == 0) {
+              LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize);
+              pstmt.executeBatch();
+            }
+          }
+          if (insertCounter % batchSize != 0) {
+            LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize);
+            pstmt.executeBatch();
+          }
         }
         LOG.debug("Going to commit");
         dbConn.commit();
@@ -3317,11 +3368,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to insert into from transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        if (insertPreparedStmts != null) {
-          for(PreparedStatement pst : insertPreparedStmts) {
-            closeStmt(pst);
-          }
-        }
         close(null, stmt, dbConn);
         unlockInternal();
       }