You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/04/15 13:01:38 UTC
[hive] branch master updated: HIVE-23107: Remove MIN_HISTORY_LEVEL
table (Laszlo Pinter, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 01035cd HIVE-23107: Remove MIN_HISTORY_LEVEL table (Laszlo Pinter, reviewed by Peter Vary)
01035cd is described below
commit 01035cd6097bd0df1deee35e5d375a80fc6e4dc7
Author: Laszlo Pinter <lp...@cloudera.com>
AuthorDate: Wed Apr 15 15:01:21 2020 +0200
HIVE-23107: Remove MIN_HISTORY_LEVEL table (Laszlo Pinter, reviewed by Peter Vary)
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 2 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 12 --
.../hadoop/hive/metastore/txn/CompactionInfo.java | 1 +
.../hive/metastore/txn/CompactionTxnHandler.java | 132 ++++++++++-----------
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 3 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 53 +--------
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 18 ++-
.../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 11 +-
.../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 4 +
.../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 12 +-
.../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 4 +
.../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 11 +-
.../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 4 +
.../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 11 +-
.../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 4 +
.../sql/postgres/hive-schema-4.0.0.postgres.sql | 11 +-
.../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 4 +
17 files changed, 108 insertions(+), 189 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 54b616e..5fa3d9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -92,7 +92,7 @@ public class Cleaner extends MetaStoreCompactorThread {
try {
handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
startedAt = System.currentTimeMillis();
- long minOpenTxnId = txnHandler.findMinOpenTxnId();
+ long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
clean(compactionInfo, minOpenTxnId);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index f3834cc..2c13e8d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -2045,13 +2045,10 @@ public class TestTxnCommands2 {
// All inserts are committed and hence would expect in TXN_TO_WRITE_ID, 3 entries for acidTbl
// and 2 entries for acidTblPart as each insert would have allocated a writeid.
- // Also MIN_HISTORY_LEVEL won't have any entries as no reference for open txns.
String acidTblWhereClause = " where t2w_database = " + quoteString("default")
+ " and t2w_table = " + quoteString(Table.ACIDTBL.name().toLowerCase());
String acidTblPartWhereClause = " where t2w_database = " + quoteString("default")
+ " and t2w_table = " + quoteString(Table.ACIDTBLPART.name().toLowerCase());
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"),
- 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL"));
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
@@ -2091,13 +2088,10 @@ public class TestTxnCommands2 {
// We would expect 4 entries in TXN_TO_WRITE_ID as each insert would have allocated a writeid
// including aborted one.
- // Also MIN_HISTORY_LEVEL will have 1 entry for the open txn.
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause));
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"),
- 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL"));
// The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as
// aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained.
@@ -2112,7 +2106,6 @@ public class TestTxnCommands2 {
// The cleaner will removed aborted txns data/metadata but cannot remove aborted txn2 from TXN_TO_WRITE_ID
// as there is a open txn < aborted txn2. The aborted txn1 < open txn and will be removed.
// Also, committed txn > open txn is retained.
- // MIN_HISTORY_LEVEL will have 1 entry for the open txn.
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
@@ -2120,18 +2113,13 @@ public class TestTxnCommands2 {
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"),
- 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL"));
// Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID.
- // Now all txns are removed from MIN_HISTORY_LEVEL. So, all entries from TXN_TO_WRITE_ID would be cleaned.
txnMgr.commitTxn();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"),
- 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL"));
}
private void verifyDirAndResult(int expectedDeltas) throws Exception {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index bf91ae7..70d63ab 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -225,6 +225,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
cr.setWorkerId(ci.workerId);
cr.setHighestWriteId(ci.highestWriteId);
cr.setErrorMessage(ci.errorMessage);
+
return cr;
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 19a95b6..2344c2d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -221,8 +221,10 @@ class CompactionTxnHandler extends TxnHandler {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " +
- "\"CQ_WORKER_ID\" = NULL WHERE \"CQ_ID\" = " + info.id;
+ String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', "
+ + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = "
+ + "(SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\")"
+ + " WHERE \"CQ_ID\" = " + info.id;
LOG.debug("Going to execute update <" + s + ">");
int updCnt = stmt.executeUpdate(s);
if (updCnt != 1) {
@@ -302,57 +304,6 @@ class CompactionTxnHandler extends TxnHandler {
return findReadyToClean();
}
}
- @Override
- public long findMinOpenTxnId() throws MetaException {
- Connection dbConn = null;
- Statement stmt = null;
- ResultSet rs = null;
- try {
- try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
- return findMinOpenTxnGLB(stmt);
- } catch (SQLException e) {
- LOG.error("Unable to findMinOpenTxnId() due to:" + e.getMessage());
- rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "findMinOpenTxnId");
- throw new MetaException("Unable to execute findMinOpenTxnId() " +
- StringUtils.stringifyException(e));
- } finally {
- close(rs, stmt, dbConn);
- }
- } catch (RetryException e) {
- return findMinOpenTxnId();
- }
- }
-
- /**
- * See doc at {@link TxnStore#findMinOpenTxnId()}
- * Note that {@link #openTxns(OpenTxnRequest)} makes update of NEXT_TXN and MIN_HISTORY_LEVEL
- * a single atomic operation (and no one else should update these tables except the cleaner
- * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher)
- */
- private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException {
- String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"";
- LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("Transaction tables not properly " +
- "initialized, no record found in next_txn_id");
- }
- long hwm = rs.getLong(1);
- s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\"";
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- rs.next();
- long minOpenTxnId = rs.getLong(1);
- if(rs.wasNull()) {
- return hwm;
- }
- //since generating new txnid uses select for update on single row in NEXT_TXN_ID
- assert hwm >= minOpenTxnId : "(hwm, minOpenTxnId)=(" + hwm + "," + minOpenTxnId + ")";
- return minOpenTxnId;
- }
/**
* This will remove an entry from the queue after
@@ -523,7 +474,7 @@ class CompactionTxnHandler extends TxnHandler {
}
/**
* Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
- * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
+ * min(NEXT_TXN_ID.ntxn_next, min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
*/
@Override
@RetrySemantics.SafeToRetry
@@ -542,25 +493,27 @@ class CompactionTxnHandler extends TxnHandler {
// First need to find the min_uncommitted_txnid which is currently seen by any open transactions.
// If there are no txns which are currently open or aborted in the system, then current value of
// NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid.
- long minUncommittedTxnId = findMinOpenTxnGLB(stmt);
-
- // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid
- // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid).
- String s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED);
+ String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
+ "SELECT MIN(\"NTXN_NEXT\") AS \"ID\" FROM \"NEXT_TXN_ID\" " +
+ "UNION " +
+ "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " +
+ "UNION " +
+ "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) +
+ " OR \"TXN_STATE\" = " + quoteChar(TXN_OPEN) +
+ ") \"RES\"";
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
- if (rs.next()) {
- long minAbortedTxnId = rs.getLong(1);
- if (minAbortedTxnId > 0) {
- minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId);
- }
+ if (!rs.next()) {
+ throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID");
}
+ long minUncommitedTxnid = rs.getLong(1);
+
// As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed
// to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table.
- s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommittedTxnId;
+ s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommitedTxnid;
LOG.debug("Going to execute delete <" + s + ">");
int rc = stmt.executeUpdate(s);
- LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId);
+ LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommitedTxnid);
LOG.debug("Going to commit");
dbConn.commit();
@@ -1168,6 +1121,53 @@ class CompactionTxnHandler extends TxnHandler {
setHadoopJobId(hadoopJobId, id);
}
}
+
+ @Override
+ @RetrySemantics.Idempotent
+ public long findMinOpenTxnIdForCleaner() throws MetaException{
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
+ LOG.debug("Going to execute query <" + query + ">");
+ rs = stmt.executeQuery(query);
+ if (!rs.next()) {
+ throw new MetaException("Transaction tables not properly initialized.");
+ }
+ long numOpenTxns = rs.getLong(1);
+ if (numOpenTxns > 0) {
+ query = "SELECT MIN(\"RES\".\"ID\") FROM (" +
+ "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) +
+ " UNION " +
+ "SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = "
+ + quoteChar(READY_FOR_CLEANING) +
+ ") \"RES\"";
+ } else {
+ query = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"";
+ }
+ LOG.debug("Going to execute query <" + query + ">");
+ rs = stmt.executeQuery(query);
+ if (!rs.next()) {
+ throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID");
+ }
+ return rs.getLong(1);
+ } catch (SQLException e) {
+ LOG.error("Unable to getMinOpenTxnIdForCleaner", e);
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "getMinOpenTxnForCleaner");
+ throw new MetaException("Unable to execute getMinOpenTxnIfForCleaner() " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ return findMinOpenTxnIdForCleaner();
+ }
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 620c77e..a66e169 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -173,7 +173,8 @@ public final class TxnDbUtil {
" CQ_HIGHEST_WRITE_ID bigint," +
" CQ_META_INFO varchar(2048) for bit data," +
" CQ_HADOOP_JOB_ID varchar(32)," +
- " CQ_ERROR_MESSAGE clob)");
+ " CQ_ERROR_MESSAGE clob," +
+ " CQ_NEXT_TXN_ID bigint)");
stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7d0db0c..be4d63c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -645,48 +645,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
pst.execute();
}
- // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
- // For a single txn we can do it in a single insert. With multiple txns calculating the
- // minOpenTxnId for every insert is not cost effective, so caching the value
- if (txnIds.size() == 1) {
- s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " +
- "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
- LOG.debug("Going to execute query <" + s + ">");
- try (PreparedStatement pstmt = dbConn.prepareStatement(s)) {
- pstmt.setLong(1, txnIds.get(0));
- pstmt.execute();
- }
- LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds);
- } else {
- s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
- LOG.debug("Going to execute query <" + s + ">");
- long minOpenTxnId = -1L;
- try(ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) {
- if (!minOpenTxnIdRs.next()) {
- throw new IllegalStateException("Scalar query returned no rows?!?!!");
- }
- // TXNS table should have at least one entry because we just inserted the newly opened txns.
- // So, min(txn_id) would be a non-zero txnid.
- minOpenTxnId = minOpenTxnIdRs.getLong(1);
- }
-
- assert (minOpenTxnId > 0);
- rows.clear();
- for (long txnId = first; txnId < first + numTxns; txnId++) {
- rows.add(txnId + ", " + minOpenTxnId);
- }
-
- // Insert transaction entries into MIN_HISTORY_LEVEL.
- List<String> inserts = sqlGenerator.createInsertValuesStmt(
- "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows);
- for (String insert : inserts) {
- LOG.debug("Going to execute insert <" + insert + ">");
- stmt.execute(insert);
- }
- LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
- + ") with min_open_txn: " + minOpenTxnId);
- }
-
if (rqst.isSetReplPolicy()) {
List<String> rowsRepl = new ArrayList<>();
for (PreparedStatement pst : insertPreparedStmts) {
@@ -1330,7 +1288,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
}
cleanUpTxnRelatedMetadata(txnid, stmt);
-
// update the key/value associated with the transaction if it has been
// set
if (rqst.isSetKeyValue()) {
@@ -1391,10 +1348,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
"DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid,
"DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid,
"DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid,
- "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid,
"DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
executeQueriesInBatch(stmt, queries, conf);
- LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
}
/**
@@ -4280,20 +4235,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false);
int numUpdateQueries = queries.size();
- // add delete min history queries to query list
- prefix.setLength(0);
- suffix.setLength(0);
- prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE ");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false);
-
// add delete hive locks queries to query list
prefix.setLength(0);
+ suffix.setLength(0);
prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE ");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false);
// execute all queries in the list in one batch
List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf);
- LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL");
return getUpdateCount(numUpdateQueries, affectedRowsByQuery);
} finally {
closeStmt(stmt);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 41d2e79..87130a5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -360,16 +360,6 @@ public interface TxnStore extends Configurable {
List<CompactionInfo> findReadyToClean() throws MetaException;
/**
- * Returns the smallest txnid that could be seen in open state across all active transactions in
- * the system or {@code NEXT_TXN_ID.NTXN_NEXT} if there are no active transactions, i.e. the
- * smallest txnid that can be seen as unresolved in the whole system. Even if a transaction
- * is opened concurrently with this call it cannot have an id less than what this method returns.
- * @return transaction ID
- */
- @RetrySemantics.ReadOnly
- long findMinOpenTxnId() throws MetaException;
-
- /**
* This will remove an entry from the queue after
* it has been compacted.
*
@@ -517,4 +507,12 @@ public interface TxnStore extends Configurable {
*/
@RetrySemantics.Idempotent
void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException;
+
+ /**
+ * Return the currently seen minimum open transaction ID.
+ * @return minimum transaction ID
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ long findMinOpenTxnIdForCleaner() throws MetaException;
}
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 05adbe9..1ace9d3 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -603,7 +603,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_HIGHEST_WRITE_ID bigint,
CQ_META_INFO varchar(2048) for bit data,
CQ_HADOOP_JOB_ID varchar(32),
- CQ_ERROR_MESSAGE clob
+ CQ_ERROR_MESSAGE clob,
+ CQ_NEXT_TXN_ID bigint
);
CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
@@ -667,14 +668,6 @@ CREATE TABLE NEXT_WRITE_ID (
CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
-CREATE TABLE MIN_HISTORY_LEVEL (
- MHL_TXNID bigint NOT NULL,
- MHL_MIN_OPEN_TXNID bigint NOT NULL,
- PRIMARY KEY(MHL_TXNID)
-);
-
-CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
MRL_TXN_ID BIGINT NOT NULL,
MRL_DB_NAME VARCHAR(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 35a2e64..8a3cd56 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -64,5 +64,9 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint;
-- HIVE-22995
ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000);
+-- HIVE-23107
+ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint;
+DROP TABLE MIN_HISTORY_LEVEL;
+
-- This needs to be the last thing done. Insert any changes above this line.
UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index f3c74bf..2e01777 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1016,6 +1016,7 @@ CREATE TABLE COMPACTION_QUEUE(
CQ_META_INFO varbinary(2048) NULL,
CQ_HADOOP_JOB_ID nvarchar(128) NULL,
CQ_ERROR_MESSAGE varchar(max) NULL,
+ CQ_NEXT_TXN_ID bigint NOT NULL,
PRIMARY KEY CLUSTERED
(
CQ_ID ASC
@@ -1200,17 +1201,6 @@ CREATE TABLE NEXT_WRITE_ID (
CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
-CREATE TABLE MIN_HISTORY_LEVEL (
- MHL_TXNID bigint NOT NULL,
- MHL_MIN_OPEN_TXNID bigint NOT NULL,
-PRIMARY KEY CLUSTERED
-(
- MHL_TXNID ASC
-)
-);
-
-CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
MRL_TXN_ID bigint NOT NULL,
MRL_DB_NAME nvarchar(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 228bb7c..9f39515 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -67,6 +67,10 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E
-- HIVE-22995
ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000);
+-- HIVE-23107
+ALTER TABLE COMPACTION_QUEUE bigint CQ_NEXT_TXN_ID NOT NULL;
+DROP TABLE MIN_HISTORY_LEVEL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 626d888..0512a45 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1072,7 +1072,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_HIGHEST_WRITE_ID bigint,
CQ_META_INFO varbinary(2048),
CQ_HADOOP_JOB_ID varchar(32),
- CQ_ERROR_MESSAGE mediumtext
+ CQ_ERROR_MESSAGE mediumtext,
+ CQ_NEXT_TXN_ID bigint
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE COMPLETED_COMPACTIONS (
@@ -1134,14 +1135,6 @@ CREATE TABLE NEXT_WRITE_ID (
CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
-CREATE TABLE MIN_HISTORY_LEVEL (
- MHL_TXNID bigint NOT NULL,
- MHL_MIN_OPEN_TXNID bigint NOT NULL,
- PRIMARY KEY(MHL_TXNID)
-) ENGINE=InnoDB DEFAULT CHARSET=latin1;
-
-CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
MRL_TXN_ID bigint NOT NULL,
MRL_DB_NAME VARCHAR(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 35da7b5..4b82e36 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -68,6 +68,10 @@ ALTER TABLE SCHEDULED_QUERIES ADD COLUMN ACTIVE_EXECUTION_ID INTEGER ;
-- HIVE-22995
ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET latin1 COLLATE latin1_bin;
+-- HIVE-23107
+ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint;
+DROP TABLE MIN_HISTORY_LEVEL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index a25f4e4..db398e5 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1053,7 +1053,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_HIGHEST_WRITE_ID NUMBER(19),
CQ_META_INFO BLOB,
CQ_HADOOP_JOB_ID varchar2(32),
- CQ_ERROR_MESSAGE CLOB
+ CQ_ERROR_MESSAGE CLOB,
+ CQ_NEXT_TXN_ID NUMBER(19)
) ROWDEPENDENCIES;
CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
@@ -1115,14 +1116,6 @@ CREATE TABLE NEXT_WRITE_ID (
CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
-CREATE TABLE MIN_HISTORY_LEVEL (
- MHL_TXNID NUMBER(19) NOT NULL,
- MHL_MIN_OPEN_TXNID NUMBER(19) NOT NULL,
- PRIMARY KEY(MHL_TXNID)
-);
-
-CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
MRL_TXN_ID NUMBER NOT NULL,
MRL_DB_NAME VARCHAR(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index d462b4a..1be83fc 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -68,6 +68,10 @@ ALTER TABLE SCHEDULED_QUERIES ADD ACTIVE_EXECUTION_ID number(19);
-- HIVE-22995
ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI VARCHAR2(4000) NULL;
+-- HIVE-23107
+ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID NUMBER(19);
+DROP TABLE MIN_HISTORY_LEVEL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual;
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index 2066340..e6e3016 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1739,7 +1739,8 @@ CREATE TABLE "COMPACTION_QUEUE" (
"CQ_HIGHEST_WRITE_ID" bigint,
"CQ_META_INFO" bytea,
"CQ_HADOOP_JOB_ID" varchar(32),
- "CQ_ERROR_MESSAGE" text
+ "CQ_ERROR_MESSAGE" text,
+ "CQ_NEXT_TXN_ID" bigint
);
CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" (
@@ -1801,14 +1802,6 @@ CREATE TABLE "NEXT_WRITE_ID" (
CREATE UNIQUE INDEX "NEXT_WRITE_ID_IDX" ON "NEXT_WRITE_ID" ("NWI_DATABASE", "NWI_TABLE");
-CREATE TABLE "MIN_HISTORY_LEVEL" (
- "MHL_TXNID" bigint NOT NULL,
- "MHL_MIN_OPEN_TXNID" bigint NOT NULL,
- PRIMARY KEY("MHL_TXNID")
-);
-
-CREATE INDEX "MIN_HISTORY_LEVEL_IDX" ON "MIN_HISTORY_LEVEL" ("MHL_MIN_OPEN_TXNID");
-
CREATE TABLE "MATERIALIZATION_REBUILD_LOCKS" (
"MRL_TXN_ID" bigint NOT NULL,
"MRL_DB_NAME" varchar(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index a50a071..b90cecb 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -199,6 +199,10 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint;
-- HIVE-22995
ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000);
+-- HIVE-23107
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NEXT_TXN_ID" bigint;
+DROP TABLE "MIN_HISTORY_LEVEL";
+
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';