You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/04/07 10:15:39 UTC

[hive] branch master updated: HIVE-23067: Use batch DB calls in TxnHandler for commitTxn and abortTxns (Marton Bod reviwed by Peter Vary and Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e8ce3f  HIVE-23067: Use batch DB calls in TxnHandler for commitTxn and abortTxns (Marton Bod reviwed by Peter Vary and Denys Kuzmenko)
9e8ce3f is described below

commit 9e8ce3f610cdef6913fe1ebe893d9e1bc93c4123
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Tue Apr 7 12:11:19 2020 +0200

    HIVE-23067: Use batch DB calls in TxnHandler for commitTxn and abortTxns (Marton Bod reviwed by Peter Vary and Denys Kuzmenko)
---
 .../hadoop/hive/metastore/txn/TxnDbUtil.java       |  32 +++++
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 152 ++++++++-------------
 2 files changed, 91 insertions(+), 93 deletions(-)

diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index ef88240..620c77e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -26,7 +26,10 @@ import java.sql.SQLException;
 import java.sql.SQLTransactionRollbackException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumMap;
+import java.util.List;
 import java.util.Properties;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -629,4 +632,33 @@ public final class TxnDbUtil {
       throw new MetaException(msg);
     }
   }
+
+  /**
+   * @param stmt Statement which will be used for batching and execution.
+   * @param queries List of sql queries to execute in a Statement batch.
+   * @param conf Configuration for retrieving max batch size param
+   * @return A list with the number of rows affected by each query in queries.
+   * @throws SQLException Thrown if an execution error occurs.
+   */
+  static List<Integer> executeQueriesInBatch(Statement stmt, List<String> queries, Configuration conf) throws SQLException {
+    List<Integer> affectedRowsByQuery = new ArrayList<>();
+    int queryCounter = 0;
+    int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+    for (String query : queries) {
+      LOG.debug("Adding query to batch: <" + query + ">");
+      queryCounter++;
+      stmt.addBatch(query);
+      if (queryCounter % batchSize == 0) {
+        LOG.debug("Going to execute queries in batch. Batch size: " + batchSize);
+        int[] affectedRecordsByQuery = stmt.executeBatch();
+        Arrays.stream(affectedRecordsByQuery).forEach(affectedRowsByQuery::add);
+      }
+    }
+    if (queryCounter % batchSize != 0) {
+      LOG.debug("Going to execute queries in batch. Batch size: " + queryCounter % batchSize);
+      int[] affectedRecordsByQuery = stmt.executeBatch();
+      Arrays.stream(affectedRecordsByQuery).forEach(affectedRowsByQuery::add);
+    }
+    return affectedRowsByQuery;
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e8a988c..7d0db0c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL;
+import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatch;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
@@ -192,7 +193,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " +
       "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " +
       "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)";
-
   private static final String TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"TXN_COMPONENTS\" (" +
           "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" +
           " VALUES (?, ?, ?, ?, ?, ?)";
@@ -200,6 +200,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   private static final String UPDATE_HIVE_LOCKS_EXT_ID_QUERY = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = %s WHERE \"HL_LOCK_EXT_ID\" = %s";
   private static final String SELECT_WRITE_ID_QUERY = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE"
           + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = ?";
+  private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " +
+          "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" +
+          " VALUES (%s, ?, ?, ?, ?, %s)";
 
   private List<TransactionalMetaStoreEventListener> transactionalListeners;
 
@@ -1120,7 +1123,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
-      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet commitIdRs = null, rs;
       try {
         lockInternal();
@@ -1302,47 +1304,32 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           }
         } else {
           if (rqst.isSetWriteEventInfos()) {
-            List<String> rows = new ArrayList<>();
-            List<List<String>> paramsList = new ArrayList<>();
-            for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
-              rows.add(txnid + ", ?, ?, ?," +
-                      writeEventInfo.getWriteId() + "," +
-                      quoteChar(isUpdateDelete));
-              List<String> params = new ArrayList<>();
-              params.add(writeEventInfo.getDatabase());
-              params.add(writeEventInfo.getTable());
-              params.add(writeEventInfo.getPartition());
-              paramsList.add(params);
-            }
-            insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
-                    "\"COMPLETED_TXN_COMPONENTS\" " +
-                   "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")",
-                    rows, paramsList);
-            for (PreparedStatement pst : insertPreparedStmts) {
-              pst.execute();
+            String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete));
+            try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) {
+              int insertCounter = 0;
+              int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+              for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
+                pstmt.setString(1, writeEventInfo.getDatabase());
+                pstmt.setString(2, writeEventInfo.getTable());
+                pstmt.setString(3, writeEventInfo.getPartition());
+                pstmt.setLong(4, writeEventInfo.getWriteId());
+
+                pstmt.addBatch();
+                insertCounter++;
+                if (insertCounter % batchSize == 0) {
+                  LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + batchSize);
+                  pstmt.executeBatch();
+                }
+              }
+              if (insertCounter % batchSize != 0) {
+                LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % batchSize);
+                pstmt.executeBatch();
+              }
             }
           }
           deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
-
-        // cleanup all txn related metadata
-        s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        s = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        s = "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        s = "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
-
-        s = "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
+        cleanUpTxnRelatedMetadata(txnid, stmt);
 
         // update the key/value associated with the transaction if it has been
         // set
@@ -1391,11 +1378,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
-        if (insertPreparedStmts != null) {
-          for (PreparedStatement pst : insertPreparedStmts) {
-            closeStmt(pst);
-          }
-        }
         close(commitIdRs, stmt, dbConn);
         unlockInternal();
       }
@@ -1404,6 +1386,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLException {
+    List<String> queries = Arrays.asList(
+        "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid,
+        "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid,
+        "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid,
+        "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid,
+        "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
+    executeQueriesInBatch(stmt, queries, conf);
+    LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
+  }
+
   /**
    * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
    * @param rqst info on table/partitions and writeid snapshot to replicate.
@@ -2695,12 +2688,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired.
    * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change.
    *
-   * {@link #checkLock(java.sql.Connection, long)}  must run at SERIALIZABLE (make sure some lock we are checking
+   * {@link #checkLock(java.sql.Connection, long, long)}  must run at SERIALIZABLE (make sure some lock we are checking
    * against doesn't move from W to A in another txn) but this method can heartbeat in
    * separate txn at READ_COMMITTED.
    *
    * Retry-by-caller note:
-   * Retryable because {@link #checkLock(Connection, long)} is
+   * Retryable because {@link #checkLock(Connection, long, long)} is
    */
   @Override
   @RetrySemantics.SafeToRetry
@@ -4266,7 +4259,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   private int abortTxns(Connection dbConn, List<Long> txnids, boolean checkHeartbeat, boolean isStrict)
       throws SQLException, MetaException {
     Statement stmt = null;
-    int updateCnt = 0;
     if (txnids.isEmpty()) {
       return 0;
     }
@@ -4275,70 +4267,44 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       //This is an update statement, thus at any Isolation level will take Write locks so will block
       //all other ops using S4U on TXNS row.
       List<String> queries = new ArrayList<>();
-
       StringBuilder prefix = new StringBuilder();
       StringBuilder suffix = new StringBuilder();
 
-      prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) +
-        " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND ");
-      if(checkHeartbeat) {
-        suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ");
-        suffix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout);
+      // add update txns queries to query list
+      prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(quoteChar(TXN_ABORTED))
+              .append(" WHERE \"TXN_STATE\" = ").append(quoteChar(TXN_OPEN)).append(" AND ");
+      if (checkHeartbeat) {
+        suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ")
+                .append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout);
       }
-
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false);
+      int numUpdateQueries = queries.size();
 
-      for (String query : queries) {
-        LOG.debug("Going to execute update <" + query + ">");
-        updateCnt += stmt.executeUpdate(query);
-      }
-
-      // As current txn is aborted, this won't read any data from other txns. So, it is safe to unregister
-      // the min_open_txnid from MIN_HISTORY_LEVEL for the aborted txns. Even if the txns in the list are
-      // partially aborted, it is safe to delete from MIN_HISTORY_LEVEL as the remaining txns are either
-      // already committed or aborted.
-      queries.clear();
+      // add delete min history queries to query list
       prefix.setLength(0);
       suffix.setLength(0);
-
       prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE ");
-
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false);
 
-      for (String query : queries) {
-        LOG.debug("Going to execute update <" + query + ">");
-        int rc = stmt.executeUpdate(query);
-        LOG.debug("Deleted " + rc + " records from MIN_HISTORY_LEVEL");
-      }
-      LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL");
-
-      if (updateCnt < txnids.size() && isStrict) {
-        /**
-         * have to bail in this case since we don't know which transactions were not Aborted and
-         * thus don't know which locks to delete
-         * This may happen due to a race between {@link #heartbeat(HeartbeatRequest)}  operation and
-         * {@link #performTimeOuts()}
-         */
-        return updateCnt;
-      }
-
-      queries.clear();
+      // add delete hive locks queries to query list
       prefix.setLength(0);
-      suffix.setLength(0);
-
       prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE ");
-
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false);
 
-      for (String query : queries) {
-        LOG.debug("Going to execute update <" + query + ">");
-        int rc = stmt.executeUpdate(query);
-        LOG.debug("Removed " + rc + " records from HIVE_LOCKS");
-      }
+      // execute all queries in the list in one batch
+      List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf);
+      LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL");
+      return getUpdateCount(numUpdateQueries, affectedRowsByQuery);
     } finally {
       closeStmt(stmt);
     }
-    return updateCnt;
+  }
+
+  private int getUpdateCount(int numUpdateQueries, List<Integer> affectedRowsByQuery) {
+    return affectedRowsByQuery.stream()
+            .limit(numUpdateQueries)
+            .mapToInt(Integer::intValue)
+            .sum();
   }
 
   private static boolean isValidTxn(long txnId) {