You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2023/05/19 15:01:31 UTC
[hive] branch master updated: HIVE-27198: Delete aborted transactions directly instead of preloading (Mahesh Raju Somalaraju, reviewed by Denys Kuzmenko, Laszlo Vegh)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 61b4c399487 HIVE-27198: Delete aborted transactions directly instead of preloading (Mahesh Raju Somalaraju, reviewed by Denys Kuzmenko, Laszlo Vegh)
61b4c399487 is described below
commit 61b4c3994876a4cf455fdde1aa16d648d0ed4897
Author: Mahesh Raju Somalaraju <ma...@cloudera.com>
AuthorDate: Fri May 19 20:31:22 2023 +0530
HIVE-27198: Delete aborted transactions directly instead of preloading (Mahesh Raju Somalaraju, reviewed by Denys Kuzmenko, Laszlo Vegh)
Closes #4174
---
.../hive/metastore/txn/CompactionTxnHandler.java | 134 ++++++++-------------
1 file changed, 49 insertions(+), 85 deletions(-)
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 12240a3ac65..83ceacbc30c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -56,8 +56,9 @@ import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
* methods are not available through the thrift interface.
*/
class CompactionTxnHandler extends TxnHandler {
- static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
- static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ private static final String CLASS_NAME = CompactionTxnHandler.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ private static final String DB_FAILED_TO_CONNECT = "Unable to connect to transaction database: ";
private static DataSource connPoolCompaction;
@@ -87,6 +88,10 @@ class CompactionTxnHandler extends TxnHandler {
"DELETE FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? " +
"AND \"CMC_METRIC_TYPE\" = ?";
+ private static final String DELETE_FAILED_TXNS_SQL =
+ "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") " +
+ "AND (\"TXN_STATE\" = " + TxnStatus.ABORTED + " OR \"TXN_STATE\" = " + TxnStatus.COMMITTED + ") " +
+ "AND \"TXN_ID\" < ?";
public CompactionTxnHandler() {
}
@@ -192,7 +197,7 @@ class CompactionTxnHandler extends TxnHandler {
}
}
} catch (SQLException e) {
- LOG.error("Unable to connect to transaction database " + e.getMessage());
+ LOG.error(DB_FAILED_TO_CONNECT + e.getMessage());
checkRetryable(e,
"findPotentialCompactions(maxAborted:" + abortedThreshold
+ ", abortedTimeThreshold:" + abortedTimeThreshold + ")");
@@ -325,8 +330,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "findNextToCompact(rqst:" + rqst + ")");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
closeStmt(updStmt);
close(rs, stmt, dbConn);
@@ -367,8 +371,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "markCompacted(" + info + ")");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
@@ -460,7 +463,7 @@ class CompactionTxnHandler extends TxnHandler {
} catch (SQLException e) {
LOG.error("Unable to select next element for cleaning, " + e.getMessage());
checkRetryable(e, "findReadyToClean");
- throw new MetaException("Unable to connect to transaction database " + e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
}
} catch (RetryException e) {
return findReadyToClean(minOpenTxnWaterMark, retentionTime);
@@ -509,8 +512,7 @@ class CompactionTxnHandler extends TxnHandler {
} catch (SQLException e) {
LOG.error("Unable to select next element for cleaning, " + e.getMessage());
checkRetryable(e, "findReadyToCleanForAborts");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
}
} catch (RetryException e) {
return findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
@@ -538,7 +540,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "markCleanerStart(" + info + ")");
- throw new MetaException("Unable to connect to transaction database " + e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
closeDbConn(dbConn);
}
@@ -567,7 +569,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "clearCleanerStart(" + info + ")");
- throw new MetaException("Unable to connect to transaction database " + e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
closeDbConn(dbConn);
}
@@ -693,8 +695,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "markCleaned(" + info + "," + isAbortOnly + ")");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
close(rs, pStmt, dbConn);
}
@@ -765,8 +766,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "markCleanedForAborts(" + info + ")");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
close(rs);
closeStmt(pStmt);
@@ -828,8 +828,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "cleanTxnToWriteIdTable");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
close(rs, stmt, dbConn);
}
@@ -922,8 +921,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "removeDuplicateCompletedTxnComponents");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
close(null, stmt, dbConn);
}
@@ -944,61 +942,33 @@ class CompactionTxnHandler extends TxnHandler {
public void cleanEmptyAbortedAndCommittedTxns() throws MetaException {
LOG.info("Start to clean empty aborted or committed TXNS");
try {
- Connection dbConn = null;
- Statement stmt = null;
- ResultSet rs = null;
- try {
- //Aborted and committed are terminal states, so nothing about the txn can change
- //after that, so READ COMMITTED is sufficient.
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
- stmt = dbConn.createStatement();
- /*
- * Only delete aborted / committed transaction in a way that guarantees two things:
- * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window
- * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window
- */
- long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
-
- String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " +
- "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " +
- " (\"TXN_STATE\" = " + TxnStatus.ABORTED + " OR \"TXN_STATE\" = " + TxnStatus.COMMITTED + ") AND "
- + " \"TXN_ID\" < " + lowWaterMark;
- LOG.debug("Going to execute query <{}>", s);
- rs = stmt.executeQuery(s);
- List<Long> txnids = new ArrayList<>();
- while (rs.next()) txnids.add(rs.getLong(1));
- close(rs);
- if(txnids.size() <= 0) {
- return;
- }
- Collections.sort(txnids);//easier to read logs
-
- List<String> queries = new ArrayList<>();
- StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
-
- // Delete from TXNS.
- prefix.append("DELETE FROM \"TXNS\" WHERE ");
-
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false);
-
- for (String query : queries) {
- LOG.debug("Going to execute update <{}>", query);
- int rc = stmt.executeUpdate(query);
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction)) {
+ try (PreparedStatement stmt = dbConn.prepareStatement(DELETE_FAILED_TXNS_SQL)) {
+ //Aborted and committed are terminal states, so nothing about the txn can change
+ //after that, so READ COMMITTED is sufficient.
+ /*
+ * Only delete aborted / committed transaction in a way that guarantees two things:
+ * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window
+ * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window
+ */
+ long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
+ stmt.setLong(1, lowWaterMark);
+ LOG.debug("Going to execute query <{}>", DELETE_FAILED_TXNS_SQL);
+ int rc = stmt.executeUpdate();
LOG.debug("Removed {} empty Aborted and Committed transactions from TXNS", rc);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to delete from txns table " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "cleanEmptyAbortedTxns");
+ throw new MetaException("Unable to delete from txns table " + e.getMessage());
}
- LOG.info("Aborted and committed transactions removed from TXNS: {}", txnids);
- LOG.debug("Going to commit");
- dbConn.commit();
} catch (SQLException e) {
- LOG.error("Unable to delete from txns table " + e.getMessage());
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
+ LOG.error(DB_FAILED_TO_CONNECT + e.getMessage());
checkRetryable(e, "cleanEmptyAbortedTxns");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
- } finally {
- close(rs, stmt, dbConn);
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
}
} catch (RetryException e) {
cleanEmptyAbortedAndCommittedTxns();
@@ -1039,8 +1009,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "revokeFromLocalWorkers(hostname:" + hostname +")");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
@@ -1086,8 +1055,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "revokeTimedoutWorkers(timeout:" + timeout + ")");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
@@ -1154,8 +1122,7 @@ class CompactionTxnHandler extends TxnHandler {
rollbackDBConn(dbConn);
checkRetryable(e, "findColumnsWithStats(" + ci.tableName +
(ci.partName == null ? "" : "/" + ci.partName) + ")");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
close(rs, pStmt, dbConn);
}
@@ -1211,8 +1178,7 @@ class CompactionTxnHandler extends TxnHandler {
} catch (SQLException e) {
rollbackDBConn(dbConn);
checkRetryable(e, "updateCompactorState(" + ci + "," + compactionTxnId +")");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
close(null, stmt, dbConn);
}
@@ -1367,8 +1333,7 @@ class CompactionTxnHandler extends TxnHandler {
} catch (SQLException e) {
rollbackDBConn(dbConn);
checkRetryable(e, "purgeCompactionHistory()");
- throw new MetaException("Unable to connect to transaction database " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
} finally {
close(rs, stmt, dbConn);
closeStmt(pStmt);
@@ -1451,7 +1416,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(e, "checkFailedCompactions(" + ci + ")");
- LOG.error("Unable to connect to transaction database", e);
+ LOG.error(DB_FAILED_TO_CONNECT, e);
return false;//weren't able to check
} finally {
close(rs, pStmt, dbConn);
@@ -1587,10 +1552,9 @@ class CompactionTxnHandler extends TxnHandler {
e.getMessage());
}
} catch (SQLException e) {
- LOG.error("Unable to connect to transaction database: " + e.getMessage());
+ LOG.error(DB_FAILED_TO_CONNECT + e.getMessage());
checkRetryable(e, "setCleanerRetryRetentionTimeOnError(" + info + ")");
- throw new MetaException("Unable to connect to transaction database: " +
- e.getMessage());
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
}
} catch (RetryException e) {
setCleanerRetryRetentionTimeOnError(info);