You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/04/17 15:37:04 UTC

[hive] branch master updated: HIVE-23093: Create new metastore config value for jdbc max batch size (Marton Bod via Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 51dda9a  HIVE-23093: Create new metastore config value for jdbc max batch size (Marton Bod via Peter Vary)
51dda9a is described below

commit 51dda9aa63d6b29ba8642f0497a5d0ce451444ef
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Fri Apr 17 17:36:26 2020 +0200

    HIVE-23093: Create new metastore config value for jdbc max batch size (Marton Bod via Peter Vary)
---
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  6 ++++
 .../hadoop/hive/metastore/txn/TxnDbUtil.java       |  5 ++-
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 42 +++++++++++-----------
 3 files changed, 28 insertions(+), 25 deletions(-)

diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 3bfb0e6..f6097f7 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -644,6 +644,12 @@ public class MetastoreConf {
             "However, it doesn't work correctly with integral values that are not normalized (e.g. have\n" +
             "leading zeroes, like 0012). If metastore direct SQL is enabled and works, this optimization\n" +
             "is also irrelevant."),
+    // Once exceeded, the queries should be broken into separate batches.
+    // Note: This value is not passed into the JDBC driver, therefore this batch size limit is not automatically enforced.
+    // Batch construction/splits should be done manually in code using this config value.
+    JDBC_MAX_BATCH_SIZE("metastore.jdbc.max.batch.size", "hive.metastore.jdbc.max.batch.size",
+            1000, new RangeValidator(1, null),
+            "Maximum number of update/delete/insert queries in a single JDBC batch statement (including Statement/PreparedStatement)."),
     KERBEROS_KEYTAB_FILE("metastore.kerberos.keytab.file",
         "hive.metastore.kerberos.keytab.file", "",
         "The path to the Kerberos Keytab file containing the metastore Thrift server's service principal."),
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index a66e169..bb29410 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -637,14 +637,13 @@ public final class TxnDbUtil {
   /**
    * @param stmt Statement which will be used for batching and execution.
    * @param queries List of sql queries to execute in a Statement batch.
-   * @param conf Configuration for retrieving max batch size param
+   * @param batchSize maximum number of queries in a single batch
    * @return A list with the number of rows affected by each query in queries.
    * @throws SQLException Thrown if an execution error occurs.
    */
-  static List<Integer> executeQueriesInBatch(Statement stmt, List<String> queries, Configuration conf) throws SQLException {
+  static List<Integer> executeQueriesInBatch(Statement stmt, List<String> queries, int batchSize) throws SQLException {
     List<Integer> affectedRowsByQuery = new ArrayList<>();
     int queryCounter = 0;
-    int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
     for (String query : queries) {
       LOG.debug("Adding query to batch: <" + query + ">");
       queryCounter++;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 962a63d..d080df4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -269,6 +269,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   // (End user) Transaction timeout, in milliseconds.
   private long timeout;
 
+  private int maxBatchSize;
   private String identifierQuoteString; // quotes to use for quoting tables, where necessary
   private long retryInterval;
   private int retryLimit;
@@ -346,6 +347,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS);
     deadlockRetryInterval = retryInterval / 10;
     maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS);
+    maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE);
 
     try {
       transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners(
@@ -1221,7 +1223,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete));
             try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) {
               int insertCounter = 0;
-              int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
               for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
                 pstmt.setString(1, writeEventInfo.getDatabase());
                 pstmt.setString(2, writeEventInfo.getTable());
@@ -1230,13 +1231,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
                 pstmt.addBatch();
                 insertCounter++;
-                if (insertCounter % batchSize == 0) {
-                  LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + batchSize);
+                if (insertCounter % maxBatchSize == 0) {
+                  LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + maxBatchSize);
                   pstmt.executeBatch();
                 }
               }
-              if (insertCounter % batchSize != 0) {
-                LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % batchSize);
+              if (insertCounter % maxBatchSize != 0) {
+                LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % maxBatchSize);
                 pstmt.executeBatch();
               }
             }
@@ -1344,7 +1345,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
 
     // execute all in one batch
-    executeQueriesInBatch(stmt, queryBatch, conf);
+    executeQueriesInBatch(stmt, queryBatch, maxBatchSize);
   }
 
   private void updateKeyValueAssociatedWithTxn(CommitTxnRequest rqst, Statement stmt) throws SQLException {
@@ -2460,7 +2461,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         // For each component in this lock request,
         // add an entry to the txn_components table
         int insertCounter = 0;
-        int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
         for (LockComponent lc : rqst.getComponent()) {
           if (lc.isSetIsTransactional() && !lc.isIsTransactional()) {
             //we don't prevent using non-acid resources in a txn but we do lock them
@@ -2483,13 +2483,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
           pstmt.addBatch();
           insertCounter++;
-          if (insertCounter % batchSize == 0) {
-            LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize);
+          if (insertCounter % maxBatchSize == 0) {
+            LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize);
             pstmt.executeBatch();
           }
         }
-        if (insertCounter % batchSize != 0) {
-          LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize);
+        if (insertCounter % maxBatchSize != 0) {
+          LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize);
           pstmt.executeBatch();
         }
       }
@@ -2572,7 +2572,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
     String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct);
     String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB);
-    int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
     long intLockId = 0;
 
     try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) {
@@ -2604,13 +2603,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         pstmt.setString(11, rqst.getAgentInfo());
 
         pstmt.addBatch();
-        if (intLockId % batchSize == 0) {
-          LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + batchSize);
+        if (intLockId % maxBatchSize == 0) {
+          LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + maxBatchSize);
           pstmt.executeBatch();
         }
       }
-      if (intLockId % batchSize != 0) {
-        LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % batchSize);
+      if (intLockId % maxBatchSize != 0) {
+        LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % maxBatchSize);
         pstmt.executeBatch();
       }
     }
@@ -3299,7 +3298,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         Long writeId = rqst.getWriteid();
         try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) {
           int insertCounter = 0;
-          int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE);
           for (String partName : rqst.getPartitionnames()) {
             pstmt.setLong(1, rqst.getTxnid());
             pstmt.setString(2, normalizeCase(rqst.getDbname()));
@@ -3310,13 +3308,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
             pstmt.addBatch();
             insertCounter++;
-            if (insertCounter % batchSize == 0) {
-              LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize);
+            if (insertCounter % maxBatchSize == 0) {
+              LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize);
               pstmt.executeBatch();
             }
           }
-          if (insertCounter % batchSize != 0) {
-            LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize);
+          if (insertCounter % maxBatchSize != 0) {
+            LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize);
             pstmt.executeBatch();
           }
         }
@@ -4246,7 +4244,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false);
 
       // execute all queries in the list in one batch
-      List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf);
+      List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, maxBatchSize);
       return getUpdateCount(numUpdateQueries, affectedRowsByQuery);
     } finally {
       closeStmt(stmt);