You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/04/07 10:15:39 UTC
[hive] branch master updated: HIVE-23067: Use batch DB calls in
TxnHandler for commitTxn and abortTxns (Marton Bod reviwed by Peter Vary
and Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9e8ce3f HIVE-23067: Use batch DB calls in TxnHandler for commitTxn and abortTxns (Marton Bod reviwed by Peter Vary and Denys Kuzmenko)
9e8ce3f is described below
commit 9e8ce3f610cdef6913fe1ebe893d9e1bc93c4123
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Tue Apr 7 12:11:19 2020 +0200
HIVE-23067: Use batch DB calls in TxnHandler for commitTxn and abortTxns (Marton Bod reviwed by Peter Vary and Denys Kuzmenko)
---
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 32 +++++
.../hadoop/hive/metastore/txn/TxnHandler.java | 152 ++++++++-------------
2 files changed, 91 insertions(+), 93 deletions(-)
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index ef88240..620c77e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -26,7 +26,10 @@ import java.sql.SQLException;
import java.sql.SQLTransactionRollbackException;
import java.sql.Statement;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumMap;
+import java.util.List;
import java.util.Properties;
import com.google.common.annotations.VisibleForTesting;
@@ -629,4 +632,33 @@ public final class TxnDbUtil {
throw new MetaException(msg);
}
}
+
+ /**
+ * @param stmt Statement which will be used for batching and execution.
+ * @param queries List of sql queries to execute in a Statement batch.
+ * @param conf Configuration for retrieving max batch size param
+ * @return A list with the number of rows affected by each query in queries.
+ * @throws SQLException Thrown if an execution error occurs.
+ */
+ static List<Integer> executeQueriesInBatch(Statement stmt, List<String> queries, Configuration conf) throws SQLException {
+ List<Integer> affectedRowsByQuery = new ArrayList<>();
+ int queryCounter = 0;
+ int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+ for (String query : queries) {
+ LOG.debug("Adding query to batch: <" + query + ">");
+ queryCounter++;
+ stmt.addBatch(query);
+ if (queryCounter % batchSize == 0) {
+ LOG.debug("Going to execute queries in batch. Batch size: " + batchSize);
+ int[] affectedRecordsByQuery = stmt.executeBatch();
+ Arrays.stream(affectedRecordsByQuery).forEach(affectedRowsByQuery::add);
+ }
+ }
+ if (queryCounter % batchSize != 0) {
+ LOG.debug("Going to execute queries in batch. Batch size: " + queryCounter % batchSize);
+ int[] affectedRecordsByQuery = stmt.executeBatch();
+ Arrays.stream(affectedRecordsByQuery).forEach(affectedRowsByQuery::add);
+ }
+ return affectedRowsByQuery;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e8a988c..7d0db0c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL;
+import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatch;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
@@ -192,7 +193,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
"\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " +
"\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)";
-
private static final String TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"TXN_COMPONENTS\" (" +
"\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" +
" VALUES (?, ?, ?, ?, ?, ?)";
@@ -200,6 +200,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private static final String UPDATE_HIVE_LOCKS_EXT_ID_QUERY = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = %s WHERE \"HL_LOCK_EXT_ID\" = %s";
private static final String SELECT_WRITE_ID_QUERY = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE"
+ " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = ?";
+ private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " +
+ "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" +
+ " VALUES (%s, ?, ?, ?, ?, %s)";
private List<TransactionalMetaStoreEventListener> transactionalListeners;
@@ -1120,7 +1123,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
Connection dbConn = null;
Statement stmt = null;
- List<PreparedStatement> insertPreparedStmts = null;
ResultSet commitIdRs = null, rs;
try {
lockInternal();
@@ -1302,47 +1304,32 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
} else {
if (rqst.isSetWriteEventInfos()) {
- List<String> rows = new ArrayList<>();
- List<List<String>> paramsList = new ArrayList<>();
- for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
- rows.add(txnid + ", ?, ?, ?," +
- writeEventInfo.getWriteId() + "," +
- quoteChar(isUpdateDelete));
- List<String> params = new ArrayList<>();
- params.add(writeEventInfo.getDatabase());
- params.add(writeEventInfo.getTable());
- params.add(writeEventInfo.getPartition());
- paramsList.add(params);
- }
- insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
- "\"COMPLETED_TXN_COMPONENTS\" " +
- "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")",
- rows, paramsList);
- for (PreparedStatement pst : insertPreparedStmts) {
- pst.execute();
+ String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete));
+ try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) {
+ int insertCounter = 0;
+ int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+ for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
+ pstmt.setString(1, writeEventInfo.getDatabase());
+ pstmt.setString(2, writeEventInfo.getTable());
+ pstmt.setString(3, writeEventInfo.getPartition());
+ pstmt.setLong(4, writeEventInfo.getWriteId());
+
+ pstmt.addBatch();
+ insertCounter++;
+ if (insertCounter % batchSize == 0) {
+ LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + batchSize);
+ pstmt.executeBatch();
+ }
+ }
+ if (insertCounter % batchSize != 0) {
+ LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % batchSize);
+ pstmt.executeBatch();
+ }
}
}
deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
}
-
- // cleanup all txn related metadata
- s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
- s = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
- s = "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
- s = "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
- LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
-
- s = "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
+ cleanUpTxnRelatedMetadata(txnid, stmt);
// update the key/value associated with the transaction if it has been
// set
@@ -1391,11 +1378,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
- if (insertPreparedStmts != null) {
- for (PreparedStatement pst : insertPreparedStmts) {
- closeStmt(pst);
- }
- }
close(commitIdRs, stmt, dbConn);
unlockInternal();
}
@@ -1404,6 +1386,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLException {
+ List<String> queries = Arrays.asList(
+ "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid,
+ "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid,
+ "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid,
+ "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid,
+ "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
+ executeQueriesInBatch(stmt, queries, conf);
+ LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
+ }
+
/**
* Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
* @param rqst info on table/partitions and writeid snapshot to replicate.
@@ -2695,12 +2688,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired.
* We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change.
*
- * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking
+ * {@link #checkLock(java.sql.Connection, long, long)} must run at SERIALIZABLE (make sure some lock we are checking
* against doesn't move from W to A in another txn) but this method can heartbeat in
* separate txn at READ_COMMITTED.
*
* Retry-by-caller note:
- * Retryable because {@link #checkLock(Connection, long)} is
+ * Retryable because {@link #checkLock(Connection, long, long)} is
*/
@Override
@RetrySemantics.SafeToRetry
@@ -4266,7 +4259,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private int abortTxns(Connection dbConn, List<Long> txnids, boolean checkHeartbeat, boolean isStrict)
throws SQLException, MetaException {
Statement stmt = null;
- int updateCnt = 0;
if (txnids.isEmpty()) {
return 0;
}
@@ -4275,70 +4267,44 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//This is an update statement, thus at any Isolation level will take Write locks so will block
//all other ops using S4U on TXNS row.
List<String> queries = new ArrayList<>();
-
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
- prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) +
- " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND ");
- if(checkHeartbeat) {
- suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ");
- suffix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout);
+ // add update txns queries to query list
+ prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(quoteChar(TXN_ABORTED))
+ .append(" WHERE \"TXN_STATE\" = ").append(quoteChar(TXN_OPEN)).append(" AND ");
+ if (checkHeartbeat) {
+ suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ")
+ .append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout);
}
-
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false);
+ int numUpdateQueries = queries.size();
- for (String query : queries) {
- LOG.debug("Going to execute update <" + query + ">");
- updateCnt += stmt.executeUpdate(query);
- }
-
- // As current txn is aborted, this won't read any data from other txns. So, it is safe to unregister
- // the min_open_txnid from MIN_HISTORY_LEVEL for the aborted txns. Even if the txns in the list are
- // partially aborted, it is safe to delete from MIN_HISTORY_LEVEL as the remaining txns are either
- // already committed or aborted.
- queries.clear();
+ // add delete min history queries to query list
prefix.setLength(0);
suffix.setLength(0);
-
prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE ");
-
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false);
- for (String query : queries) {
- LOG.debug("Going to execute update <" + query + ">");
- int rc = stmt.executeUpdate(query);
- LOG.debug("Deleted " + rc + " records from MIN_HISTORY_LEVEL");
- }
- LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL");
-
- if (updateCnt < txnids.size() && isStrict) {
- /**
- * have to bail in this case since we don't know which transactions were not Aborted and
- * thus don't know which locks to delete
- * This may happen due to a race between {@link #heartbeat(HeartbeatRequest)} operation and
- * {@link #performTimeOuts()}
- */
- return updateCnt;
- }
-
- queries.clear();
+ // add delete hive locks queries to query list
prefix.setLength(0);
- suffix.setLength(0);
-
prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE ");
-
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false);
- for (String query : queries) {
- LOG.debug("Going to execute update <" + query + ">");
- int rc = stmt.executeUpdate(query);
- LOG.debug("Removed " + rc + " records from HIVE_LOCKS");
- }
+ // execute all queries in the list in one batch
+ List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf);
+ LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL");
+ return getUpdateCount(numUpdateQueries, affectedRowsByQuery);
} finally {
closeStmt(stmt);
}
- return updateCnt;
+ }
+
+ private int getUpdateCount(int numUpdateQueries, List<Integer> affectedRowsByQuery) {
+ return affectedRowsByQuery.stream()
+ .limit(numUpdateQueries)
+ .mapToInt(Integer::intValue)
+ .sum();
}
private static boolean isValidTxn(long txnId) {