You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/04/02 11:39:57 UTC
[hive] branch master updated: HIVE-23052: Optimize lock enqueueing
in TxnHandler (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 8739c58 HIVE-23052: Optimize lock enqueueing in TxnHandler (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)
8739c58 is described below
commit 8739c5838a849cd487ebfe474250d847bb5ac208
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Thu Apr 2 13:37:02 2020 +0200
HIVE-23052: Optimize lock enqueueing in TxnHandler (Marton Bod reviewed by Denys Kuzmenko and Peter Vary)
---
.../datasource/BoneCPDataSourceProvider.java | 1 +
.../datasource/DbCPDataSourceProvider.java | 1 +
.../datasource/HikariCPDataSourceProvider.java | 1 +
.../hadoop/hive/metastore/txn/TxnHandler.java | 436 ++++++++++++---------
4 files changed, 244 insertions(+), 195 deletions(-)
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
index f92ce73..de202c9 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java
@@ -89,6 +89,7 @@ public class BoneCPDataSourceProvider implements DataSourceProvider {
DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl);
switch (dbProduct){
case MYSQL:
+ connProperties.put("allowMultiQueries", true);
connProperties.put("rewriteBatchedStatements", true);
break;
case POSTGRES:
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
index 85719fd..12cacd2 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
@@ -71,6 +71,7 @@ public class DbCPDataSourceProvider implements DataSourceProvider {
DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl);
switch (dbProduct){
case MYSQL:
+ dbcpDs.setConnectionProperties("allowMultiQueries=true");
dbcpDs.setConnectionProperties("rewriteBatchedStatements=true");
break;
case POSTGRES:
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
index 76bbf3b..3e871e9 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
@@ -76,6 +76,7 @@ public class HikariCPDataSourceProvider implements DataSourceProvider {
switch (dbProduct){
case MYSQL:
config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES");
+ config.addDataSourceProperty("allowMultiQueries", true);
config.addDataSourceProperty("rewriteBatchedStatements", true);
break;
case POSTGRES:
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 74ef885..e8a988c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +53,7 @@ import javax.sql.DataSource;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -179,6 +181,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private static final int ALLOWED_REPEATED_DEADLOCKS = 10;
private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
+ private static final Long TEMP_HIVE_LOCK_ID = -1L;
private static DataSource connPool;
private static DataSource connPoolMutex;
@@ -190,6 +193,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
"\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)";
+ private static final String TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"TXN_COMPONENTS\" (" +
+ "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" +
+ " VALUES (?, ?, ?, ?, ?, ?)";
+ private static final String INCREMENT_NEXT_LOCK_ID_QUERY = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = %s";
+ private static final String UPDATE_HIVE_LOCKS_EXT_ID_QUERY = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = %s WHERE \"HL_LOCK_EXT_ID\" = %s";
+ private static final String SELECT_WRITE_ID_QUERY = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE"
+ + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = ?";
+
private List<TransactionalMetaStoreEventListener> transactionalListeners;
/**
@@ -2394,9 +2405,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Connection dbConn = null;
try {
Statement stmt = null;
- PreparedStatement pStmt = null;
- List<PreparedStatement> insertPreparedStmts = null;
- ResultSet rs = null;
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -2410,175 +2418,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
shouldNeverHappen(txnid);
}
}
+ /* Insert txn components and hive locks (with a temp extLockId) first, before getting the next lock ID in a select-for-update.
+ This should minimize the scope of the S4U and decrease the table lock duration. */
+ insertTxnComponents(txnid, rqst, dbConn);
+ insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst);
+
/** Get the next lock id.
* This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race.
* Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7,
* 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and acquires the locks. Then 7 unblocks,
* and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)}
* doesn't block on locks acquired later than one it's checking*/
- String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\"");
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- throw new MetaException("Transaction tables not properly " +
- "initialized, no record found in next_lock_id");
- }
- long extLockId = rs.getLong(1);
- s = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = " + (extLockId + 1);
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
-
- if (txnid > 0) {
- List<String> rows = new ArrayList<>();
- List<List<String>> paramsList = new ArrayList<>();
- // For each component in this lock request,
- // add an entry to the txn_components table
- for (LockComponent lc : rqst.getComponent()) {
- if(lc.isSetIsTransactional() && !lc.isIsTransactional()) {
- //we don't prevent using non-acid resources in a txn but we do lock them
- continue;
- }
- boolean updateTxnComponents;
- if(!lc.isSetOperationType()) {
- //request came from old version of the client
- updateTxnComponents = true;//this matches old behavior
- }
- else {
- switch (lc.getOperationType()) {
- case INSERT:
- case UPDATE:
- case DELETE:
- if(!lc.isSetIsDynamicPartitionWrite()) {
- //must be old client talking, i.e. we don't know if it's DP so be conservative
- updateTxnComponents = true;
- }
- else {
- /**
- * we know this is part of DP operation and so we'll get
- * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
- * of partitions actually chaged.
- */
- updateTxnComponents = !lc.isIsDynamicPartitionWrite();
- }
- break;
- case SELECT:
- updateTxnComponents = false;
- break;
- case NO_TXN:
- /*this constant is a bit of a misnomer since we now always have a txn context. It
- just means the operation is such that we don't care what tables/partitions it
- affected as it doesn't trigger a compaction or conflict detection. A better name
- would be NON_TRANSACTIONAL.*/
- updateTxnComponents = false;
- break;
- default:
- //since we have an open transaction, only 4 values above are expected
- throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
- + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid));
- }
- }
- if(!updateTxnComponents) {
- continue;
- }
- String dbName = normalizeCase(lc.getDbname());
- String tblName = normalizeCase(lc.getTablename());
- String partName = normalizeCase(lc.getPartitionname());
- Long writeId = null;
- if (tblName != null) {
- // It is assumed the caller have already allocated write id for adding/updating data to
- // the acid tables. However, DDL operatons won't allocate write id and hence this query
- // may return empty result sets.
- // Get the write id allocated by this txn for the given table writes
- s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE"
- + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid;
- pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName));
- LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
- quoteString(dbName), quoteString(tblName));
- rs = pStmt.executeQuery();
- if (rs.next()) {
- writeId = rs.getLong(1);
- }
- }
- rows.add(txnid + ", ?, " +
- (tblName == null ? "null" : "?") + ", " +
- (partName == null ? "null" : "?")+ "," +
- quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," +
- (writeId == null ? "null" : writeId));
- List<String> params = new ArrayList<>();
- params.add(dbName);
- if (tblName != null) {
- params.add(tblName);
- }
- if (partName != null) {
- params.add(partName);
- }
- paramsList.add(params);
- }
- insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
- "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")",
- rows, paramsList);
- for(PreparedStatement pst : insertPreparedStmts) {
- int modCount = pst.executeUpdate();
- closeStmt(pst);
- }
- insertPreparedStmts = null;
- }
- String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct);
- String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB);
-
- int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
- long intLockId = 0;
-
- try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) {
- for (LockComponent lc : rqst.getComponent()) {
- if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
- (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) {
- //Old version of thrift client should have (lc.isSetOperationType() == false), but they do not
- //If you add a default value to a variable, isSet() for that variable is true regardless of the where the
- //message was created (for object variables).
- //It works correctly for boolean vars, e.g. LockComponent.isTransactional).
- //in test mode, upgrades are not tested, so client version and server version of thrift always matches so
- //we see UNSET here it means something didn't set the appropriate value.
- throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component "
- + lc + " agentInfo=" + rqst.getAgentInfo());
- }
- intLockId++;
-
- char lockChar = 'z';
- switch (lc.getType()) {
- case EXCLUSIVE:
- lockChar = LOCK_EXCLUSIVE;
- break;
- case SHARED_READ:
- lockChar = LOCK_SHARED;
- break;
- case SHARED_WRITE:
- lockChar = LOCK_SEMI_SHARED;
- break;
- }
- pstmt.setLong(1, extLockId);
- pstmt.setLong(2, intLockId);
- pstmt.setLong(3, txnid);
- pstmt.setString(4, normalizeCase(lc.getDbname()));
- pstmt.setString(5, normalizeCase(lc.getTablename()));
- pstmt.setString(6, normalizeCase(lc.getPartitionname()));
- pstmt.setString(7, Character.toString(LOCK_WAITING));
- pstmt.setString(8, Character.toString(lockChar));
- pstmt.setString(9, rqst.getUser());
- pstmt.setString(10, rqst.getHostname());
- pstmt.setString(11, rqst.getAgentInfo());
+ long extLockId = getNextLockIdForUpdate(dbConn, stmt);
+ incrementLockIdAndUpdateHiveLocks(stmt, extLockId);
- pstmt.addBatch();
- if (intLockId % batchSize == 0) {
- pstmt.executeBatch();
- }
- }
- if (intLockId % batchSize != 0) {
- pstmt.executeBatch();
- }
- }
dbConn.commit();
success = true;
return new ConnectionLockIdPair(dbConn, extLockId);
@@ -2589,13 +2442,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database " +
StringUtils.stringifyException(e));
} finally {
- if (insertPreparedStmts != null) {
- for (PreparedStatement pst : insertPreparedStmts) {
- closeStmt(pst);
- }
- }
- closeStmt(pStmt);
- close(rs, stmt, null);
+ closeStmt(stmt);
if (!success) {
/* This needs to return a "live" connection to be used by operation that follows it.
Thus it only closes Connection on failure/retry. */
@@ -2608,6 +2455,206 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return enqueueLockWithRetry(rqst);
}
}
+
+ private long getNextLockIdForUpdate(Connection dbConn, Statement stmt) throws SQLException, MetaException {
+ String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\"");
+ LOG.debug("Going to execute query <" + s + ">");
+ try (ResultSet rs = stmt.executeQuery(s)) {
+ if (!rs.next()) {
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, no record found in next_lock_id");
+ }
+ return rs.getLong(1);
+ }
+ }
+
+ private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId) throws SQLException {
+ String incrCmd = String.format(INCREMENT_NEXT_LOCK_ID_QUERY, (extLockId + 1));
+ // update hive locks entries with the real EXT_LOCK_ID (replace temp ID)
+ String updateLocksCmd = String.format(UPDATE_HIVE_LOCKS_EXT_ID_QUERY, extLockId, TEMP_HIVE_LOCK_ID);
+ LOG.debug("Going to execute updates in batch: <" + incrCmd + ">, and <" + updateLocksCmd + ">");
+ stmt.addBatch(incrCmd);
+ stmt.addBatch(updateLocksCmd);
+ stmt.executeBatch();
+ }
+
+ private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn) throws SQLException {
+ if (txnid > 0) {
+ Map<Pair<String, String>, Optional<Long>> writeIdCache = new HashMap<>();
+ try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) {
+ // For each component in this lock request,
+ // add an entry to the txn_components table
+ int insertCounter = 0;
+ int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+ for (LockComponent lc : rqst.getComponent()) {
+ if (lc.isSetIsTransactional() && !lc.isIsTransactional()) {
+ //we don't prevent using non-acid resources in a txn but we do lock them
+ continue;
+ }
+ if (!shouldUpdateTxnComponent(txnid, rqst, lc)) {
+ continue;
+ }
+ String dbName = normalizeCase(lc.getDbname());
+ String tblName = normalizeCase(lc.getTablename());
+ String partName = normalizeCase(lc.getPartitionname());
+ Optional<Long> writeId = getWriteId(writeIdCache, dbName, tblName, txnid, dbConn);
+
+ pstmt.setLong(1, txnid);
+ pstmt.setString(2, dbName);
+ pstmt.setString(3, tblName);
+ pstmt.setString(4, partName);
+ pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).toString());
+ pstmt.setObject(6, writeId.orElse(null));
+
+ pstmt.addBatch();
+ insertCounter++;
+ if (insertCounter % batchSize == 0) {
+ LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize);
+ pstmt.executeBatch();
+ }
+ }
+ if (insertCounter % batchSize != 0) {
+ LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize);
+ pstmt.executeBatch();
+ }
+ }
+ }
+ }
+
+ private Optional<Long> getWriteId(Map<Pair<String, String>, Optional<Long>> writeIdCache, String dbName, String tblName, long txnid, Connection dbConn) throws SQLException {
+ /* we can cache writeIDs based on dbName and tblName because txnid is invariant and
+ partitionName is not part of the writeID select query */
+ Pair<String, String> dbAndTable = Pair.of(dbName, tblName);
+ if (writeIdCache.containsKey(dbAndTable)) {
+ return writeIdCache.get(dbAndTable);
+ } else {
+ Optional<Long> writeId = getWriteIdFromDb(txnid, dbConn, dbName, tblName);
+ writeIdCache.put(dbAndTable, writeId);
+ return writeId;
+ }
+ }
+
+ private Optional<Long> getWriteIdFromDb(long txnid, Connection dbConn, String dbName, String tblName) throws SQLException {
+ if (tblName != null) {
+ // It is assumed the caller have already allocated write id for adding/updating data to
+ // the acid tables. However, DDL operatons won't allocate write id and hence this query
+ // may return empty result sets.
+ // Get the write id allocated by this txn for the given table writes
+ try (PreparedStatement pstmt = dbConn.prepareStatement(SELECT_WRITE_ID_QUERY)) {
+ pstmt.setString(1, dbName);
+ pstmt.setString(2, tblName);
+ pstmt.setLong(3, txnid);
+ LOG.debug("Going to execute query <" + SELECT_WRITE_ID_QUERY + ">");
+ try (ResultSet rs = pstmt.executeQuery()) {
+ if (rs.next()) {
+ return Optional.of(rs.getLong(1));
+ }
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ private boolean shouldUpdateTxnComponent(long txnid, LockRequest rqst, LockComponent lc) {
+ if(!lc.isSetOperationType()) {
+ //request came from old version of the client
+ return true; //this matches old behavior
+ }
+ else {
+ switch (lc.getOperationType()) {
+ case INSERT:
+ case UPDATE:
+ case DELETE:
+ if(!lc.isSetIsDynamicPartitionWrite()) {
+ //must be old client talking, i.e. we don't know if it's DP so be conservative
+ return true;
+ }
+ else {
+ /**
+ * we know this is part of DP operation and so we'll get
+ * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
+ * of partitions actually chaged.
+ */
+ return !lc.isIsDynamicPartitionWrite();
+ }
+ case SELECT:
+ return false;
+ case NO_TXN:
+ /*this constant is a bit of a misnomer since we now always have a txn context. It
+ just means the operation is such that we don't care what tables/partitions it
+ affected as it doesn't trigger a compaction or conflict detection. A better name
+ would be NON_TRANSACTIONAL.*/
+ return false;
+ default:
+ //since we have an open transaction, only 4 values above are expected
+ throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
+ + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid));
+ }
+ }
+ }
+
+ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException {
+
+ String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct);
+ String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB);
+ int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+ long intLockId = 0;
+
+ try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) {
+ for (LockComponent lc : rqst.getComponent()) {
+ if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
+ (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) {
+ //Old version of thrift client should have (lc.isSetOperationType() == false), but they do not
+ //If you add a default value to a variable, isSet() for that variable is true regardless of the where the
+ //message was created (for object variables).
+ //It works correctly for boolean vars, e.g. LockComponent.isTransactional).
+ //in test mode, upgrades are not tested, so client version and server version of thrift always matches so
+ //we see UNSET here it means something didn't set the appropriate value.
+ throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component "
+ + lc + " agentInfo=" + rqst.getAgentInfo());
+ }
+ intLockId++;
+
+ pstmt.setLong(1, TEMP_HIVE_LOCK_ID);
+ pstmt.setLong(2, intLockId);
+ pstmt.setLong(3, txnid);
+ pstmt.setString(4, normalizeCase(lc.getDbname()));
+ pstmt.setString(5, normalizeCase(lc.getTablename()));
+ pstmt.setString(6, normalizeCase(lc.getPartitionname()));
+ pstmt.setString(7, Character.toString(LOCK_WAITING));
+ pstmt.setString(8, Character.toString(getLockChar(lc.getType())));
+ pstmt.setString(9, rqst.getUser());
+ pstmt.setString(10, rqst.getHostname());
+ pstmt.setString(11, rqst.getAgentInfo());
+
+ pstmt.addBatch();
+ if (intLockId % batchSize == 0) {
+ LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + batchSize);
+ pstmt.executeBatch();
+ }
+ }
+ if (intLockId % batchSize != 0) {
+ LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % batchSize);
+ pstmt.executeBatch();
+ }
+ }
+ }
+
+ private char getLockChar(LockType lockType) {
+ switch (lockType) {
+ case EXCLUSIVE:
+ return LOCK_EXCLUSIVE;
+ case SHARED_READ:
+ return LOCK_SHARED;
+ case SHARED_WRITE:
+ return LOCK_SEMI_SHARED;
+ default:
+ return 'z';
+ }
+ }
+
private static String normalizeCase(String s) {
return s == null ? null : s.toLowerCase();
}
@@ -3271,7 +3318,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throws NoSuchTxnException, TxnAbortedException, MetaException {
Connection dbConn = null;
Statement stmt = null;
- List<PreparedStatement> insertPreparedStmts = null;
try {
try {
lockInternal();
@@ -3290,23 +3336,28 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
Long writeId = rqst.getWriteid();
- List<String> rows = new ArrayList<>();
- List<List<String>> paramsList = new ArrayList<>();
- for (String partName : rqst.getPartitionnames()) {
- rows.add(rqst.getTxnid() + ",?,?,?," + quoteChar(ot.sqlConst) + "," + writeId);
- List<String> params = new ArrayList<>();
- params.add(normalizeCase(rqst.getDbname()));
- params.add(normalizeCase(rqst.getTablename()));
- params.add(partName);
- paramsList.add(params);
- }
- int modCount = 0;
- //record partitions that were written to
- insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
- "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")",
- rows, paramsList);
- for(PreparedStatement pst : insertPreparedStmts) {
- modCount = pst.executeUpdate();
+ try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) {
+ int insertCounter = 0;
+ int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
+ for (String partName : rqst.getPartitionnames()) {
+ pstmt.setLong(1, rqst.getTxnid());
+ pstmt.setString(2, normalizeCase(rqst.getDbname()));
+ pstmt.setString(3, normalizeCase(rqst.getTablename()));
+ pstmt.setString(4, partName);
+ pstmt.setString(5, Character.toString(ot.sqlConst));
+ pstmt.setObject(6, writeId);
+
+ pstmt.addBatch();
+ insertCounter++;
+ if (insertCounter % batchSize == 0) {
+ LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize);
+ pstmt.executeBatch();
+ }
+ }
+ if (insertCounter % batchSize != 0) {
+ LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize);
+ pstmt.executeBatch();
+ }
}
LOG.debug("Going to commit");
dbConn.commit();
@@ -3317,11 +3368,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to insert into from transaction database " +
StringUtils.stringifyException(e));
} finally {
- if (insertPreparedStmts != null) {
- for(PreparedStatement pst : insertPreparedStmts) {
- closeStmt(pst);
- }
- }
close(null, stmt, dbConn);
unlockInternal();
}