You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/04/17 15:37:04 UTC
[hive] branch master updated: HIVE-23093: Create new metastore
config value for jdbc max batch size (Marton Bod via Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 51dda9a HIVE-23093: Create new metastore config value for jdbc max batch size (Marton Bod via Peter Vary)
51dda9a is described below
commit 51dda9aa63d6b29ba8642f0497a5d0ce451444ef
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Fri Apr 17 17:36:26 2020 +0200
HIVE-23093: Create new metastore config value for jdbc max batch size (Marton Bod via Peter Vary)
---
.../hadoop/hive/metastore/conf/MetastoreConf.java | 6 ++++
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 5 ++-
.../hadoop/hive/metastore/txn/TxnHandler.java | 42 +++++++++++-----------
3 files changed, 28 insertions(+), 25 deletions(-)
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 3bfb0e6..f6097f7 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -644,6 +644,12 @@ public class MetastoreConf {
"However, it doesn't work correctly with integral values that are not normalized (e.g. have\n" +
"leading zeroes, like 0012). If metastore direct SQL is enabled and works, this optimization\n" +
"is also irrelevant."),
+ // Once exceeded, the queries should be broken into separate batches.
+ // Note: This value is not passed into the JDBC driver, therefore this batch size limit is not automatically enforced.
+ // Batch construction/splits should be done manually in code using this config value.
+ JDBC_MAX_BATCH_SIZE("metastore.jdbc.max.batch.size", "hive.metastore.jdbc.max.batch.size",
+ 1000, new RangeValidator(1, null),
+ "Maximum number of update/delete/insert queries in a single JDBC batch statement (including Statement/PreparedStatement)."),
KERBEROS_KEYTAB_FILE("metastore.kerberos.keytab.file",
"hive.metastore.kerberos.keytab.file", "",
"The path to the Kerberos Keytab file containing the metastore Thrift server's service principal."),
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index a66e169..bb29410 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -637,14 +637,13 @@ public final class TxnDbUtil {
/**
* @param stmt Statement which will be used for batching and execution.
* @param queries List of sql queries to execute in a Statement batch.
- * @param conf Configuration for retrieving max batch size param
+ * @param batchSize maximum number of queries in a single batch
* @return A list with the number of rows affected by each query in queries.
* @throws SQLException Thrown if an execution error occurs.
*/
- static List<Integer> executeQueriesInBatch(Statement stmt, List<String> queries, Configuration conf) throws SQLException {
+ static List<Integer> executeQueriesInBatch(Statement stmt, List<String> queries, int batchSize) throws SQLException {
List<Integer> affectedRowsByQuery = new ArrayList<>();
int queryCounter = 0;
- int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
for (String query : queries) {
LOG.debug("Adding query to batch: <" + query + ">");
queryCounter++;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 962a63d..d080df4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -269,6 +269,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// (End user) Transaction timeout, in milliseconds.
private long timeout;
+ private int maxBatchSize;
private String identifierQuoteString; // quotes to use for quoting tables, where necessary
private long retryInterval;
private int retryLimit;
@@ -346,6 +347,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS);
deadlockRetryInterval = retryInterval / 10;
maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS);
+ maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE);
try {
transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners(
@@ -1221,7 +1223,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete));
try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) {
int insertCounter = 0;
- int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
pstmt.setString(1, writeEventInfo.getDatabase());
pstmt.setString(2, writeEventInfo.getTable());
@@ -1230,13 +1231,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
pstmt.addBatch();
insertCounter++;
- if (insertCounter % batchSize == 0) {
- LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + batchSize);
+ if (insertCounter % maxBatchSize == 0) {
+ LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + maxBatchSize);
pstmt.executeBatch();
}
}
- if (insertCounter % batchSize != 0) {
- LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % batchSize);
+ if (insertCounter % maxBatchSize != 0) {
+ LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % maxBatchSize);
pstmt.executeBatch();
}
}
@@ -1344,7 +1345,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
// execute all in one batch
- executeQueriesInBatch(stmt, queryBatch, conf);
+ executeQueriesInBatch(stmt, queryBatch, maxBatchSize);
}
private void updateKeyValueAssociatedWithTxn(CommitTxnRequest rqst, Statement stmt) throws SQLException {
@@ -2460,7 +2461,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// For each component in this lock request,
// add an entry to the txn_components table
int insertCounter = 0;
- int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
for (LockComponent lc : rqst.getComponent()) {
if (lc.isSetIsTransactional() && !lc.isIsTransactional()) {
//we don't prevent using non-acid resources in a txn but we do lock them
@@ -2483,13 +2483,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
pstmt.addBatch();
insertCounter++;
- if (insertCounter % batchSize == 0) {
- LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize);
+ if (insertCounter % maxBatchSize == 0) {
+ LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize);
pstmt.executeBatch();
}
}
- if (insertCounter % batchSize != 0) {
- LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize);
+ if (insertCounter % maxBatchSize != 0) {
+ LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize);
pstmt.executeBatch();
}
}
@@ -2572,7 +2572,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct);
String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB);
- int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
long intLockId = 0;
try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) {
@@ -2604,13 +2603,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
pstmt.setString(11, rqst.getAgentInfo());
pstmt.addBatch();
- if (intLockId % batchSize == 0) {
- LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + batchSize);
+ if (intLockId % maxBatchSize == 0) {
+ LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + maxBatchSize);
pstmt.executeBatch();
}
}
- if (intLockId % batchSize != 0) {
- LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % batchSize);
+ if (intLockId % maxBatchSize != 0) {
+ LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % maxBatchSize);
pstmt.executeBatch();
}
}
@@ -3299,7 +3298,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Long writeId = rqst.getWriteid();
try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) {
int insertCounter = 0;
- int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
for (String partName : rqst.getPartitionnames()) {
pstmt.setLong(1, rqst.getTxnid());
pstmt.setString(2, normalizeCase(rqst.getDbname()));
@@ -3310,13 +3308,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
pstmt.addBatch();
insertCounter++;
- if (insertCounter % batchSize == 0) {
- LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize);
+ if (insertCounter % maxBatchSize == 0) {
+ LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize);
pstmt.executeBatch();
}
}
- if (insertCounter % batchSize != 0) {
- LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize);
+ if (insertCounter % maxBatchSize != 0) {
+ LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize);
pstmt.executeBatch();
}
}
@@ -4246,7 +4244,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false);
// execute all queries in the list in one batch
- List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf);
+ List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, maxBatchSize);
return getUpdateCount(numUpdateQueries, affectedRowsByQuery);
} finally {
closeStmt(stmt);