You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2023/05/19 15:01:31 UTC

[hive] branch master updated: HIVE-27198: Delete aborted transactions directly instead of preloading (Mahesh Raju Somalaraju, reviewed by Denys Kuzmenko, Laszlo Vegh)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 61b4c399487 HIVE-27198: Delete aborted transactions directly instead of preloading (Mahesh Raju Somalaraju, reviewed by Denys Kuzmenko, Laszlo Vegh)
61b4c399487 is described below

commit 61b4c3994876a4cf455fdde1aa16d648d0ed4897
Author: Mahesh Raju Somalaraju <ma...@cloudera.com>
AuthorDate: Fri May 19 20:31:22 2023 +0530

    HIVE-27198: Delete aborted transactions directly instead of preloading (Mahesh Raju Somalaraju, reviewed by Denys Kuzmenko, Laszlo Vegh)
    
    Closes #4174
---
 .../hive/metastore/txn/CompactionTxnHandler.java   | 134 ++++++++-------------
 1 file changed, 49 insertions(+), 85 deletions(-)

diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 12240a3ac65..83ceacbc30c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -56,8 +56,9 @@ import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
  * methods are not available through the thrift interface.
  */
 class CompactionTxnHandler extends TxnHandler {
-  static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
-  static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+  private static final String CLASS_NAME = CompactionTxnHandler.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+  private static final String DB_FAILED_TO_CONNECT = "Unable to connect to transaction database: ";
 
   private static DataSource connPoolCompaction;
 
@@ -87,6 +88,10 @@ class CompactionTxnHandler extends TxnHandler {
       "DELETE FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? " +
           "AND \"CMC_METRIC_TYPE\" = ?";
 
+  private static final String DELETE_FAILED_TXNS_SQL =
+      "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") " +
+          "AND (\"TXN_STATE\" = " + TxnStatus.ABORTED + " OR \"TXN_STATE\" = " + TxnStatus.COMMITTED + ") " +
+          "AND \"TXN_ID\" < ?";
   public CompactionTxnHandler() {
   }
 
@@ -192,7 +197,7 @@ class CompactionTxnHandler extends TxnHandler {
           }
         }
       } catch (SQLException e) {
-        LOG.error("Unable to connect to transaction database " + e.getMessage());
+        LOG.error(DB_FAILED_TO_CONNECT + e.getMessage());
         checkRetryable(e,
             "findPotentialCompactions(maxAborted:" + abortedThreshold
                 + ", abortedTimeThreshold:" + abortedTimeThreshold + ")");
@@ -325,8 +330,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "findNextToCompact(rqst:" + rqst + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         closeStmt(updStmt);
         close(rs, stmt, dbConn);
@@ -367,8 +371,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "markCompacted(" + info + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
@@ -460,7 +463,7 @@ class CompactionTxnHandler extends TxnHandler {
       } catch (SQLException e) {
         LOG.error("Unable to select next element for cleaning, " + e.getMessage());
         checkRetryable(e, "findReadyToClean");
-        throw new MetaException("Unable to connect to transaction database " + e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       }
     } catch (RetryException e) {
       return findReadyToClean(minOpenTxnWaterMark, retentionTime);
@@ -509,8 +512,7 @@ class CompactionTxnHandler extends TxnHandler {
       } catch (SQLException e) {
         LOG.error("Unable to select next element for cleaning, " + e.getMessage());
         checkRetryable(e, "findReadyToCleanForAborts");
-        throw new MetaException("Unable to connect to transaction database " +
-                e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       }
     } catch (RetryException e) {
       return findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
@@ -538,7 +540,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "markCleanerStart(" + info + ")");
-        throw new MetaException("Unable to connect to transaction database " + e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         closeDbConn(dbConn);
       }
@@ -567,7 +569,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "clearCleanerStart(" + info + ")");
-        throw new MetaException("Unable to connect to transaction database " + e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         closeDbConn(dbConn);
       }
@@ -693,8 +695,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "markCleaned(" + info + "," + isAbortOnly + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         close(rs, pStmt, dbConn);
       }
@@ -765,8 +766,7 @@ class CompactionTxnHandler extends TxnHandler {
       LOG.debug("Going to rollback");
       rollbackDBConn(dbConn);
       checkRetryable(e, "markCleanedForAborts(" + info + ")");
-      throw new MetaException("Unable to connect to transaction database " +
-              e.getMessage());
+      throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
     } finally {
       close(rs);
       closeStmt(pStmt);
@@ -828,8 +828,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "cleanTxnToWriteIdTable");
-        throw new MetaException("Unable to connect to transaction database " +
-                e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         close(rs, stmt, dbConn);
       }
@@ -922,8 +921,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "removeDuplicateCompletedTxnComponents");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         close(null, stmt, dbConn);
       }
@@ -944,61 +942,33 @@ class CompactionTxnHandler extends TxnHandler {
   public void cleanEmptyAbortedAndCommittedTxns() throws MetaException {
     LOG.info("Start to clean empty aborted or committed TXNS");
     try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        //Aborted and committed are terminal states, so nothing about the txn can change
-        //after that, so READ COMMITTED is sufficient.
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
-        stmt = dbConn.createStatement();
-        /*
-         * Only delete aborted / committed transaction in a way that guarantees two things:
-         * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window
-         * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window
-          */
-        long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
-
-        String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " +
-            "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " +
-            " (\"TXN_STATE\" = " + TxnStatus.ABORTED + " OR \"TXN_STATE\" = " + TxnStatus.COMMITTED + ")  AND "
-            + " \"TXN_ID\" < " + lowWaterMark;
-        LOG.debug("Going to execute query <{}>", s);
-        rs = stmt.executeQuery(s);
-        List<Long> txnids = new ArrayList<>();
-        while (rs.next()) txnids.add(rs.getLong(1));
-        close(rs);
-        if(txnids.size() <= 0) {
-          return;
-        }
-        Collections.sort(txnids);//easier to read logs
-
-        List<String> queries = new ArrayList<>();
-        StringBuilder prefix = new StringBuilder();
-        StringBuilder suffix = new StringBuilder();
-
-        // Delete from TXNS.
-        prefix.append("DELETE FROM \"TXNS\" WHERE ");
-
-        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false);
-
-        for (String query : queries) {
-          LOG.debug("Going to execute update <{}>", query);
-          int rc = stmt.executeUpdate(query);
+      try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction)) {
+        try (PreparedStatement stmt = dbConn.prepareStatement(DELETE_FAILED_TXNS_SQL)) {
+          //Aborted and committed are terminal states, so nothing about the txn can change
+          //after that, so READ COMMITTED is sufficient.
+          /*
+           * Only delete aborted / committed transaction in a way that guarantees two things:
+           * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window
+           * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window
+           */
+          long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
+          stmt.setLong(1, lowWaterMark);
+          LOG.debug("Going to execute query <{}>", DELETE_FAILED_TXNS_SQL);
+          int rc = stmt.executeUpdate();
           LOG.debug("Removed {} empty Aborted and Committed transactions from TXNS", rc);
+          LOG.debug("Going to commit");
+          dbConn.commit();
+        } catch (SQLException e) {
+          LOG.error("Unable to delete from txns table " + e.getMessage());
+          LOG.debug("Going to rollback");
+          rollbackDBConn(dbConn);
+          checkRetryable(e, "cleanEmptyAbortedTxns");
+          throw new MetaException("Unable to delete from txns table " + e.getMessage());
         }
-        LOG.info("Aborted and committed transactions removed from TXNS: {}", txnids);
-        LOG.debug("Going to commit");
-        dbConn.commit();
       } catch (SQLException e) {
-        LOG.error("Unable to delete from txns table " + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
+        LOG.error(DB_FAILED_TO_CONNECT + e.getMessage());
         checkRetryable(e, "cleanEmptyAbortedTxns");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
-      } finally {
-        close(rs, stmt, dbConn);
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       }
     } catch (RetryException e) {
       cleanEmptyAbortedAndCommittedTxns();
@@ -1039,8 +1009,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "revokeFromLocalWorkers(hostname:" + hostname +")");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
@@ -1086,8 +1055,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "revokeTimedoutWorkers(timeout:" + timeout + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
@@ -1154,8 +1122,7 @@ class CompactionTxnHandler extends TxnHandler {
         rollbackDBConn(dbConn);
         checkRetryable(e, "findColumnsWithStats(" + ci.tableName +
           (ci.partName == null ? "" : "/" + ci.partName) + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         close(rs, pStmt, dbConn);
       }
@@ -1211,8 +1178,7 @@ class CompactionTxnHandler extends TxnHandler {
       } catch (SQLException e) {
         rollbackDBConn(dbConn);
         checkRetryable(e, "updateCompactorState(" + ci + "," + compactionTxnId +")");
-        throw new MetaException("Unable to connect to transaction database " +
-            e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         close(null, stmt, dbConn);
       }
@@ -1367,8 +1333,7 @@ class CompactionTxnHandler extends TxnHandler {
       } catch (SQLException e) {
         rollbackDBConn(dbConn);
         checkRetryable(e, "purgeCompactionHistory()");
-        throw new MetaException("Unable to connect to transaction database " +
-          e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       } finally {
         close(rs, stmt, dbConn);
         closeStmt(pStmt);
@@ -1451,7 +1416,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(e, "checkFailedCompactions(" + ci + ")");
-        LOG.error("Unable to connect to transaction database", e);
+        LOG.error(DB_FAILED_TO_CONNECT, e);
         return false;//weren't able to check
       } finally {
         close(rs, pStmt, dbConn);
@@ -1587,10 +1552,9 @@ class CompactionTxnHandler extends TxnHandler {
                   e.getMessage());
         }
       } catch (SQLException e) {
-        LOG.error("Unable to connect to transaction database: " + e.getMessage());
+        LOG.error(DB_FAILED_TO_CONNECT + e.getMessage());
         checkRetryable(e, "setCleanerRetryRetentionTimeOnError(" + info  + ")");
-        throw new MetaException("Unable to connect to transaction database: " +
-                e.getMessage());
+        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
       }
     } catch (RetryException e) {
       setCleanerRetryRetentionTimeOnError(info);